feat: add ingest API + health endpoint, fix bot-to-bot logging
Telegram bots can't see messages from other bots in groups. Added: - POST /api/ingest - local services log messages into bridge DB - GET /api/health - status check endpoint - Fixed post_init not running (manual init lifecycle) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
1cb16e6e8f
commit
494bb510d3
3 changed files with 303 additions and 9 deletions
219
docs/nexus-mcp-bridge-addendum.md
Normal file
219
docs/nexus-mcp-bridge-addendum.md
Normal file
|
|
@ -0,0 +1,219 @@
|
||||||
|
# MCP Bridge Spec — Addendum: Session Tracking & Per-Chat Context
|
||||||
|
|
||||||
|
Add these sections to the existing nexus-mcp-bridge-spec.md
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 11. Session Tracking
|
||||||
|
|
||||||
|
Claude Code sessions are ephemeral. If a session dies and restarts,
|
||||||
|
context is lost. The bridge must track session state to detect this.
|
||||||
|
|
||||||
|
### Agent-side: session file
|
||||||
|
|
||||||
|
On startup, each Claude Code session writes its session ID to a
|
||||||
|
known file in its working directory:
|
||||||
|
|
||||||
|
```python
|
||||||
|
# Agent writes on boot (in CLAUDE.md startup hook or .bashrc)
|
||||||
|
import uuid, json
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
session = {
|
||||||
|
"id": str(uuid.uuid4()),
|
||||||
|
"started_at": datetime.utcnow().isoformat(),
|
||||||
|
"chat_id": TELEGRAM_CHAT_ID, # Which chat this session serves
|
||||||
|
"project": "homelab", # or "felt", "sentry", etc.
|
||||||
|
}
|
||||||
|
|
||||||
|
with open(os.path.expanduser("~/.claude-session"), "w") as f:
|
||||||
|
json.dump(session, f)
|
||||||
|
```
|
||||||
|
|
||||||
|
### Bridge-side: session awareness
|
||||||
|
|
||||||
|
The bridge reads the session file on every poll. If the session ID
|
||||||
|
changes between polls, it means the agent restarted:
|
||||||
|
|
||||||
|
```python
|
||||||
|
@dataclass
|
||||||
|
class SessionState:
|
||||||
|
session_id: str
|
||||||
|
started_at: datetime
|
||||||
|
chat_id: str
|
||||||
|
project: str
|
||||||
|
last_seen_id: str | None = None # Previous session for comparison
|
||||||
|
|
||||||
|
def check_session(self) -> tuple[SessionState, bool]:
|
||||||
|
"""Returns (current_session, has_restarted)."""
|
||||||
|
with open(os.path.expanduser("~/.claude-session")) as f:
|
||||||
|
data = json.load(f)
|
||||||
|
|
||||||
|
current = SessionState(**data)
|
||||||
|
restarted = (self.last_seen_id is not None
|
||||||
|
and self.last_seen_id != current.session_id)
|
||||||
|
self.last_seen_id = current.session_id
|
||||||
|
return current, restarted
|
||||||
|
```
|
||||||
|
|
||||||
|
### Restart notification
|
||||||
|
|
||||||
|
When a restart is detected, pull_updates includes a warning:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"updates": [
|
||||||
|
{
|
||||||
|
"message_id": null,
|
||||||
|
"content": "⚠️ Agent session restarted. Previous context lost. Session: abc-123 → def-456",
|
||||||
|
"created_at": "2026-03-30T08:15:00Z",
|
||||||
|
"type": "session_restart"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Persistent task state
|
||||||
|
|
||||||
|
The agent persists its current task state to disk so it can resume
|
||||||
|
after a restart instead of starting blind:
|
||||||
|
|
||||||
|
```
|
||||||
|
~/.claude-session-state.json
|
||||||
|
{
|
||||||
|
"session_id": "abc-123",
|
||||||
|
"current_task": "Fix nexus.mg DNS record",
|
||||||
|
"current_task_id": "msg-456",
|
||||||
|
"last_completed_step": 2,
|
||||||
|
"pending_messages": ["msg-789"],
|
||||||
|
"updated_at": "2026-03-30T01:15:00Z"
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
On restart, the agent reads this file, announces what it was doing,
|
||||||
|
and asks the owner whether to resume or start fresh.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 12. Per-Chat Context Isolation
|
||||||
|
|
||||||
|
Each Telegram group chat maps to a separate Claude Code session with
|
||||||
|
its own context window, working directory, skills, and knowledge files.
|
||||||
|
|
||||||
|
### Chat → Session mapping
|
||||||
|
|
||||||
|
```python
|
||||||
|
CHAT_SESSIONS = {
|
||||||
|
"homelab": {
|
||||||
|
"chat_id": "<telegram-group-id-homelab>",
|
||||||
|
"workdir": "/home/mikkel/homelab",
|
||||||
|
"claude_md": "/home/mikkel/homelab/.claude/CLAUDE.md",
|
||||||
|
"session_file": "/home/mikkel/homelab/.claude-session",
|
||||||
|
},
|
||||||
|
"felt": {
|
||||||
|
"chat_id": "<telegram-group-id-felt>",
|
||||||
|
"workdir": "/home/mikkel/felt",
|
||||||
|
"claude_md": "/home/mikkel/felt/.claude/CLAUDE.md",
|
||||||
|
"session_file": "/home/mikkel/felt/.claude-session",
|
||||||
|
},
|
||||||
|
"sentry": {
|
||||||
|
"chat_id": "<telegram-group-id-sentry>",
|
||||||
|
"workdir": "/home/mikkel/sentry",
|
||||||
|
"claude_md": "/home/mikkel/sentry/.claude/CLAUDE.md",
|
||||||
|
"session_file": "/home/mikkel/sentry/.claude-session",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Queue is per-chat
|
||||||
|
|
||||||
|
Each chat has its own queue. Messages dispatched to "homelab" don't
|
||||||
|
appear in the "felt" queue. The MCP tools gain a `target` parameter:
|
||||||
|
|
||||||
|
```
|
||||||
|
send_message(target="homelab", message="Fix the DNS record")
|
||||||
|
send_message(target="felt", message="Update the Go backend")
|
||||||
|
pull_updates(target="homelab")
|
||||||
|
pull_updates(target="felt")
|
||||||
|
queue_status() # Returns status for ALL targets
|
||||||
|
```
|
||||||
|
|
||||||
|
### Session lifecycle per chat
|
||||||
|
|
||||||
|
Each Claude Code session is started with its project-specific context:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Homelab session
|
||||||
|
cd ~/homelab && claude --channels plugin:telegram@claude-plugins-official
|
||||||
|
|
||||||
|
# Felt session
|
||||||
|
cd ~/felt && claude --channels plugin:telegram@claude-plugins-official
|
||||||
|
|
||||||
|
# Sentry session
|
||||||
|
cd ~/sentry && claude --channels plugin:telegram@claude-plugins-official
|
||||||
|
```
|
||||||
|
|
||||||
|
Each session reads its own CLAUDE.md, has its own skills directory,
|
||||||
|
and only sees messages from its own Telegram group. Context stays
|
||||||
|
clean and focused.
|
||||||
|
|
||||||
|
### The bridge routes by target
|
||||||
|
|
||||||
|
```python
|
||||||
|
async def send_message(target: str, message: str) -> str:
|
||||||
|
session_config = CHAT_SESSIONS.get(target)
|
||||||
|
if not session_config:
|
||||||
|
return json.dumps({"queued": False, "reason": f"unknown target: {target}"})
|
||||||
|
|
||||||
|
msg = queues[target].enqueue(message)
|
||||||
|
if msg is None:
|
||||||
|
return json.dumps({"queued": False, "reason": "duplicate"})
|
||||||
|
|
||||||
|
await transport.send(
|
||||||
|
chat_id=session_config["chat_id"],
|
||||||
|
message=message,
|
||||||
|
)
|
||||||
|
return json.dumps({"queued": True, "id": msg.id, "target": target})
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 13. Updated MCP Tools Summary
|
||||||
|
|
||||||
|
| Tool | Parameters | Returns |
|
||||||
|
|------|-----------|---------|
|
||||||
|
| `send_message` | `target`, `message` | `{queued, id, target}` or `{reason}` |
|
||||||
|
| `pull_updates` | `target`, `since?` | `{updates[], session_restart?}` |
|
||||||
|
| `queue_status` | — | Per-target: `{pending, in_progress, current_task, session_id}` |
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 14. Agent System Prompt Addition (for CLAUDE.md)
|
||||||
|
|
||||||
|
```markdown
|
||||||
|
## Session Tracking
|
||||||
|
|
||||||
|
On startup:
|
||||||
|
1. Generate a session UUID
|
||||||
|
2. Write it to ~/.claude-session with timestamp and project name
|
||||||
|
3. Read ~/.claude-session-state.json if it exists
|
||||||
|
4. If resuming from a crash, announce:
|
||||||
|
"[Session restarted. Previous session was working on: <task>.
|
||||||
|
Resume or start fresh?]"
|
||||||
|
5. Persist task state to ~/.claude-session-state.json after every step
|
||||||
|
|
||||||
|
## Message Queue Awareness
|
||||||
|
|
||||||
|
You receive messages from multiple sources:
|
||||||
|
- Direct Telegram messages from Mikkel
|
||||||
|
- Queued tasks from claude.ai via the MCP bridge
|
||||||
|
|
||||||
|
When you receive a new message while working on a task:
|
||||||
|
1. Announce: "[New message received: <summary>]"
|
||||||
|
2. Assess urgency — URGENT or STOP = halt immediately
|
||||||
|
3. If non-urgent, acknowledge and continue current task
|
||||||
|
4. Address queued messages in order after current task completes
|
||||||
|
|
||||||
|
Always announce which message you're responding to:
|
||||||
|
"[Responding to: <first line of message>]"
|
||||||
|
```
|
||||||
|
|
@ -7,7 +7,7 @@ import signal
|
||||||
from .config import MCP_HOST, MCP_PORT, MEDIA_DIR, DB_PATH
|
from .config import MCP_HOST, MCP_PORT, MEDIA_DIR, DB_PATH
|
||||||
from .db import Database
|
from .db import Database
|
||||||
from .telegram_bot import BridgeBot
|
from .telegram_bot import BridgeBot
|
||||||
from .mcp_server import mcp, init as init_mcp
|
from .mcp_server import mcp, init as init_mcp, custom_routes
|
||||||
|
|
||||||
logging.basicConfig(
|
logging.basicConfig(
|
||||||
level=logging.INFO,
|
level=logging.INFO,
|
||||||
|
|
@ -21,6 +21,8 @@ async def run_telegram_bot(bot: BridgeBot):
|
||||||
"""Run the telegram bot polling loop."""
|
"""Run the telegram bot polling loop."""
|
||||||
app = bot.build_application()
|
app = bot.build_application()
|
||||||
await app.initialize()
|
await app.initialize()
|
||||||
|
# post_init isn't called automatically when we manually start
|
||||||
|
await bot._post_init(app)
|
||||||
await app.start()
|
await app.start()
|
||||||
updater = app.updater
|
updater = app.updater
|
||||||
await updater.start_polling(drop_pending_updates=True)
|
await updater.start_polling(drop_pending_updates=True)
|
||||||
|
|
@ -37,14 +39,17 @@ async def run_telegram_bot(bot: BridgeBot):
|
||||||
|
|
||||||
|
|
||||||
async def run_mcp_server():
|
async def run_mcp_server():
|
||||||
"""Run the FastMCP HTTP server using its built-in runner."""
|
"""Run the FastMCP HTTP server with custom API routes."""
|
||||||
|
import uvicorn
|
||||||
|
|
||||||
|
# Get the FastMCP Starlette app and add our custom routes to it
|
||||||
|
mcp_app = mcp.http_app()
|
||||||
|
mcp_app.routes.extend(custom_routes)
|
||||||
|
|
||||||
logger.info(f"MCP server starting on {MCP_HOST}:{MCP_PORT}")
|
logger.info(f"MCP server starting on {MCP_HOST}:{MCP_PORT}")
|
||||||
await mcp.run_http_async(
|
config = uvicorn.Config(mcp_app, host=MCP_HOST, port=MCP_PORT, log_level="info")
|
||||||
host=MCP_HOST,
|
server = uvicorn.Server(config)
|
||||||
port=MCP_PORT,
|
await server.serve()
|
||||||
log_level="info",
|
|
||||||
show_banner=False,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
|
|
@ -73,7 +78,7 @@ async def main():
|
||||||
loop.add_signal_handler(sig, handle_signal)
|
loop.add_signal_handler(sig, handle_signal)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await asyncio.gather(telegram_task, mcp_task)
|
await asyncio.gather(telegram_task, mcp_task, return_exceptions=True)
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,16 +1,86 @@
|
||||||
"""FastMCP server exposing bridge tools to claude.ai."""
|
"""FastMCP server exposing bridge tools to claude.ai."""
|
||||||
|
|
||||||
import json
|
import json
|
||||||
|
import logging
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
from fastmcp import FastMCP
|
from fastmcp import FastMCP
|
||||||
|
from starlette.requests import Request
|
||||||
|
from starlette.responses import JSONResponse
|
||||||
|
from starlette.routing import Route
|
||||||
|
|
||||||
from .db import Database
|
from .db import Database
|
||||||
from .config import get_group_chat_id
|
from .config import get_group_chat_id
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
# Will be initialized in __main__ with shared db instance
|
# Will be initialized in __main__ with shared db instance
|
||||||
db: Database | None = None
|
db: Database | None = None
|
||||||
|
|
||||||
|
|
||||||
|
async def ingest_message(request: Request) -> JSONResponse:
|
||||||
|
"""HTTP endpoint for local services to log messages into the bridge.
|
||||||
|
|
||||||
|
POST /api/ingest
|
||||||
|
{
|
||||||
|
"telegram_message_id": 123, # required
|
||||||
|
"chat_id": -100..., # required
|
||||||
|
"sender_type": "homelab_bot", # required
|
||||||
|
"sender_id": 8521598773, # optional
|
||||||
|
"sender_name": "Homelab Bot", # optional
|
||||||
|
"content": "message text", # optional
|
||||||
|
"reply_to_message_id": null, # optional
|
||||||
|
"created_at": "ISO8601" # optional (defaults to now)
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
data = await request.json()
|
||||||
|
except Exception:
|
||||||
|
return JSONResponse({"error": "invalid JSON"}, status_code=400)
|
||||||
|
|
||||||
|
telegram_message_id = data.get("telegram_message_id")
|
||||||
|
chat_id = data.get("chat_id")
|
||||||
|
if not telegram_message_id or not chat_id:
|
||||||
|
return JSONResponse(
|
||||||
|
{"error": "telegram_message_id and chat_id are required"},
|
||||||
|
status_code=400,
|
||||||
|
)
|
||||||
|
|
||||||
|
created_at = data.get("created_at", datetime.now(timezone.utc).isoformat())
|
||||||
|
|
||||||
|
msg_id = db.insert_message(
|
||||||
|
telegram_message_id=telegram_message_id,
|
||||||
|
chat_id=chat_id,
|
||||||
|
sender_type=data.get("sender_type", "unknown"),
|
||||||
|
sender_id=data.get("sender_id"),
|
||||||
|
sender_name=data.get("sender_name"),
|
||||||
|
content=data.get("content"),
|
||||||
|
reply_to_message_id=data.get("reply_to_message_id"),
|
||||||
|
has_attachment=data.get("has_attachment", False),
|
||||||
|
created_at=created_at,
|
||||||
|
)
|
||||||
|
|
||||||
|
if msg_id is None:
|
||||||
|
return JSONResponse({"ok": True, "duplicate": True})
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Ingested message {telegram_message_id} from {data.get('sender_name', 'unknown')}"
|
||||||
|
)
|
||||||
|
return JSONResponse({"ok": True, "id": msg_id})
|
||||||
|
|
||||||
|
|
||||||
|
async def health(request: Request) -> JSONResponse:
|
||||||
|
"""Health check endpoint."""
|
||||||
|
status = db.get_status()
|
||||||
|
return JSONResponse({"status": "ok", **status})
|
||||||
|
|
||||||
|
|
||||||
|
# Custom routes added to the FastMCP app
|
||||||
|
custom_routes = [
|
||||||
|
Route("/api/ingest", ingest_message, methods=["POST"]),
|
||||||
|
Route("/api/health", health, methods=["GET"]),
|
||||||
|
]
|
||||||
|
|
||||||
mcp = FastMCP(
|
mcp = FastMCP(
|
||||||
name="homelab-bridge",
|
name="homelab-bridge",
|
||||||
instructions=(
|
instructions=(
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue