telegram-bot-mcp/docs/nexus-mcp-bridge-spec.md
2026-03-29 23:32:03 +00:00

12 KiB

Nexus MCP Bridge: claude.ai ↔ Homelab Agent

Author: Mikkel Georgsen (msgeorgsen@gmail.com)
Date: 2026-03-30
Language: Python
Runs on: LXC 102 (mgmt)
Transport: Telegram bot (existing) or direct HTTP over NetBird


1. Purpose

Let Claude (in claude.ai) dispatch tasks to and pull updates from the homelab management agent (Claude Code on LXC 102) without Mikkel being the copy-paste middleman.


2. Architecture

┌──────────────┐     ┌──────────────────┐     ┌──────────────────┐
│  claude.ai   │     │  MCP Bridge      │     │  Homelab Agent   │
│  (this chat) │────▶│  (Python on mgmt)│────▶│  (Claude Code)   │
│              │◀────│                  │◀────│                  │
│  MCP client  │     │  Queue + Dedup   │     │  Telegram bot    │
└──────────────┘     └──────────────────┘     └──────────────────┘
                          │
                     Hosted on LXC 102
                     Accessible via NetBird mesh

3. Queue Model

The queue accepts multiple messages but deduplicates identical ones. The agent triages incoming messages and announces which one it's responding to. No locking — the agent handles interrupts per its system prompt rules.

from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import hashlib


class MessageStatus(Enum):
    PENDING = "pending"         # In queue, not yet picked up
    ACKNOWLEDGED = "acknowledged"  # Agent has seen it
    IN_PROGRESS = "in_progress"    # Agent is actively working on it
    COMPLETE = "complete"       # Done
    ERROR = "error"             # Failed


@dataclass
class QueueMessage:
    id: str                     # Auto-generated UUID
    content: str                # The task/message text
    content_hash: str           # SHA256 of content for dedup
    status: MessageStatus = MessageStatus.PENDING
    created_at: datetime = field(default_factory=datetime.utcnow)
    acknowledged_at: datetime | None = None
    completed_at: datetime | None = None


@dataclass
class AgentUpdate:
    id: str                     # Auto-generated UUID
    message_id: str | None      # Which queued message this relates to
    content: str                # The status update text
    created_at: datetime = field(default_factory=datetime.utcnow)


@dataclass
class Queue:
    messages: list[QueueMessage] = field(default_factory=list)
    updates: list[AgentUpdate] = field(default_factory=list)
    
    def enqueue(self, content: str) -> QueueMessage | None:
        """Add a message. Returns None if duplicate exists."""
        content_hash = hashlib.sha256(content.encode()).hexdigest()
        
        # Check for duplicate among unprocessed messages
        for msg in self.messages:
            if (msg.content_hash == content_hash 
                and msg.status in (MessageStatus.PENDING, 
                                   MessageStatus.ACKNOWLEDGED,
                                   MessageStatus.IN_PROGRESS)):
                return None  # Duplicate — reject
        
        msg = QueueMessage(
            id=str(uuid4()),
            content=content,
            content_hash=content_hash,
        )
        self.messages.append(msg)
        return msg
    
    def pull_updates(self, since: datetime | None = None) -> list[AgentUpdate]:
        """Get all updates since a timestamp."""
        if since is None:
            return self.updates
        return [u for u in self.updates if u.created_at > since]
    
    def status(self) -> dict:
        """Current queue state."""
        return {
            "pending": len([m for m in self.messages 
                          if m.status == MessageStatus.PENDING]),
            "in_progress": len([m for m in self.messages 
                              if m.status == MessageStatus.IN_PROGRESS]),
            "total_updates": len(self.updates),
            "current_task": next(
                (m.content for m in self.messages 
                 if m.status == MessageStatus.IN_PROGRESS), 
                None
            ),
        }

4. MCP Server Tools

Three tools exposed to claude.ai:

send_message

Send a task/message to the homelab agent.
Rejected if an identical message is already pending or in progress.

Input:  { "message": "Fix the nexus.mg DNS record to 100.79.65.206" }
Output: { "queued": true, "id": "abc-123" }
   or:  { "queued": false, "reason": "duplicate" }

pull_updates

Get all status updates from the agent since last pull.
Returns step-by-step reports, completions, errors.

Input:  {} (or { "since": "2026-03-30T01:00:00Z" })
Output: {
  "updates": [
    {
      "message_id": "abc-123",
      "content": "[Step 1] Checking Technitium... ✅ Record found",
      "created_at": "2026-03-30T01:02:15Z"
    },
    ...
  ]
}

queue_status

Check the current state of the queue.

Input:  {}
Output: {
  "pending": 1,
  "in_progress": 1,
  "total_updates": 5,
  "current_task": "Fix the nexus.mg DNS record to 100.79.65.206"
}

5. MCP Server Implementation

# mcp_bridge/server.py
import json
import asyncio
from datetime import datetime
from mcp.server import Server
from mcp.types import Tool, TextContent

from .queue import Queue, MessageStatus
from .telegram_transport import TelegramTransport


app = Server("homelab-bridge")
queue = Queue()
transport = TelegramTransport()  # Wraps existing ~/bin/telegram


@app.tool()
async def send_message(message: str) -> str:
    """Send a task to the homelab agent. Rejects duplicates."""
    msg = queue.enqueue(message)
    if msg is None:
        return json.dumps({"queued": False, "reason": "duplicate"})
    
    # Dispatch to agent via Telegram
    await transport.send(message)
    
    return json.dumps({"queued": True, "id": msg.id})


@app.tool()
async def pull_updates(since: str | None = None) -> str:
    """Pull status updates from the agent since a given timestamp."""
    since_dt = datetime.fromisoformat(since) if since else None
    updates = queue.pull_updates(since_dt)
    
    return json.dumps({
        "updates": [
            {
                "message_id": u.message_id,
                "content": u.content,
                "created_at": u.created_at.isoformat(),
            }
            for u in updates
        ]
    })


@app.tool()
async def queue_status() -> str:
    """Check current queue state."""
    return json.dumps(queue.status())


# Background task: poll Telegram for agent responses
async def poll_agent_responses():
    """Watch for incoming Telegram messages from the agent."""
    async for message in transport.receive():
        # Try to match to a queued task
        current = next(
            (m for m in queue.messages 
             if m.status == MessageStatus.IN_PROGRESS),
            None
        )
        queue.updates.append(AgentUpdate(
            id=str(uuid4()),
            message_id=current.id if current else None,
            content=message,
        ))


if __name__ == "__main__":
    import asyncio
    from mcp.server.stdio import stdio_server
    
    async def main():
        asyncio.create_task(poll_agent_responses())
        async with stdio_server() as (read, write):
            await app.run(read, write)
    
    asyncio.run(main())

6. Telegram Transport

Wraps the existing ~/bin/telegram script or hits the Telegram Bot API directly.

# mcp_bridge/telegram_transport.py
import asyncio
import aiohttp
import os


class TelegramTransport:
    def __init__(self):
        self.bot_token = self._load_token()
        self.chat_id = self._load_chat_id()  # The agent's Telegram chat
        self.base_url = f"https://api.telegram.org/bot{self.bot_token}"
        self.last_update_id = 0
    
    def _load_token(self) -> str:
        creds_path = os.path.expanduser(
            "~/homelab/telegram/credentials"
        )
        with open(creds_path) as f:
            # Parse token from credentials file
            for line in f:
                if "token" in line.lower() or "bot" in line.lower():
                    return line.split("=", 1)[1].strip()
            return f.read().strip()
    
    def _load_chat_id(self) -> str:
        creds_path = os.path.expanduser(
            "~/homelab/telegram/credentials"
        )
        with open(creds_path) as f:
            for line in f:
                if "chat_id" in line.lower():
                    return line.split("=", 1)[1].strip()
        return ""
    
    async def send(self, message: str):
        """Send a message to the agent via Telegram."""
        async with aiohttp.ClientSession() as session:
            await session.post(
                f"{self.base_url}/sendMessage",
                json={
                    "chat_id": self.chat_id,
                    "text": message,
                    "parse_mode": "Markdown",
                },
            )
    
    async def receive(self):
        """Async generator that yields incoming messages."""
        async with aiohttp.ClientSession() as session:
            while True:
                resp = await session.get(
                    f"{self.base_url}/getUpdates",
                    params={
                        "offset": self.last_update_id + 1,
                        "timeout": 30,  # Long polling
                    },
                )
                data = await resp.json()
                
                for update in data.get("result", []):
                    self.last_update_id = update["update_id"]
                    msg = update.get("message", {})
                    text = msg.get("text", "")
                    if text:
                        yield text
                
                await asyncio.sleep(1)

7. Agent-Side Integration

The agent's system prompt (CLAUDE.md) includes awareness of the queue:

## Message Queue

You may receive messages from multiple sources:
- Direct Telegram messages from Mikkel
- Queued tasks from claude.ai via the MCP bridge

When you receive a new message while working on a task:
1. Announce: "[New message received: <summary>]"
2. Assess urgency — if it says URGENT or STOP, halt immediately
3. If non-urgent, acknowledge and continue current task
4. Address queued messages in order after current task completes

Always announce which message you're responding to:
"[Responding to: <first line of message>]"

8. Deployment

# On LXC 102 (mgmt)
cd ~/homelab
mkdir -p mcp-bridge
cd mcp-bridge

# Create venv
python3 -m venv .venv
source .venv/bin/activate

# Install dependencies
pip install mcp aiohttp

# Copy the server files
# (server.py, queue.py, telegram_transport.py)

# Test locally
python -m mcp_bridge.server

As a systemd service:

[Unit]
Description=Nexus MCP Bridge
After=network.target

[Service]
Type=simple
User=mikkel
WorkingDirectory=/home/mikkel/homelab/mcp-bridge
ExecStart=/home/mikkel/homelab/mcp-bridge/.venv/bin/python -m mcp_bridge.server
Restart=always
RestartSec=5

[Install]
WantedBy=multi-user.target

MCP registration in claude.ai:

The MCP server needs to be accessible from claude.ai. Options:

  1. HTTP transport — Expose via NPM at mcp.georgsen.dk with SSE
  2. Direct registration — If claude.ai supports remote MCP URLs

The HTTP/SSE transport is the most practical since claude.ai's MCP connections expect a URL endpoint.


9. Future: Sub-Agent Dispatch

Once the bridge works for the homelab agent, extend it to dispatch to Nexus project agents. The queue becomes multi-target:

send_message(target="homelab", message="...")
send_message(target="nexus:felt", message="...")
send_message(target="nexus:sentry", message="...")

Each target has its own queue and its own agent on the other end. This is where Nexus's project management layer comes in — the PM agent decides how to route work to specialist agents.


---

## 10. Why Python

- MCP SDK has first-class Python support
- Telegram bot integration already Python on mgmt (~/venv)
- Thin glue layer, not performance-critical
- ~50 messages/day between three participants
- Fastest to iterate on while figuring out queue semantics
- Agent already has Python set up and ready