From fbd87da6d67fb8e30946917d9af106d498320e56 Mon Sep 17 00:00:00 2001 From: scotttong Date: Thu, 19 Mar 2026 17:02:10 -0700 Subject: [PATCH] experiment: add chat relay endpoint for real-time streaming responses New POST /api/agents/:id/chat/relay endpoint that calls the adapter directly and streams stdout back via SSE, bypassing the heartbeat queue. Comments are persisted normally so conversations stay durable. Frontend tries the relay first, falls back to poll-based flow if unavailable. Backend: 1 new file (agent-chat.ts), 1 line in app.ts. Frontend: streaming fetch in CEOChatPanel with fallback. Co-Authored-By: Claude Opus 4.6 (1M context) --- server/src/app.ts | 2 + server/src/routes/agent-chat.ts | 142 +++++++++++++++++++++++++++++ ui/src/components/CEOChatPanel.tsx | 107 ++++++++++++++++++---- 3 files changed, 234 insertions(+), 17 deletions(-) create mode 100644 server/src/routes/agent-chat.ts diff --git a/server/src/app.ts b/server/src/app.ts index 8200ef83..54a21c87 100644 --- a/server/src/app.ts +++ b/server/src/app.ts @@ -26,6 +26,7 @@ import { instanceSettingsRoutes } from "./routes/instance-settings.js"; import { llmRoutes } from "./routes/llms.js"; import { assetRoutes } from "./routes/assets.js"; import { accessRoutes } from "./routes/access.js"; +import { agentChatRoutes } from "./routes/agent-chat.js"; import { pluginRoutes } from "./routes/plugins.js"; import { pluginUiStaticRoutes } from "./routes/plugin-ui-static.js"; import { applyUiBranding } from "./ui-branding.js"; @@ -137,6 +138,7 @@ export async function createApp( ); api.use("/companies", companyRoutes(db)); api.use(agentRoutes(db)); + api.use(agentChatRoutes(db)); api.use(assetRoutes(db, opts.storageService)); api.use(projectRoutes(db)); api.use(issueRoutes(db, opts.storageService)); diff --git a/server/src/routes/agent-chat.ts b/server/src/routes/agent-chat.ts new file mode 100644 index 00000000..150136a6 --- /dev/null +++ b/server/src/routes/agent-chat.ts @@ -0,0 +1,142 @@ +import { Router } from "express"; +import { randomUUID } from "node:crypto"; +import type { Db } from "@paperclipai/db"; +import { agents as agentsTable } from "@paperclipai/db"; +import { eq } from "drizzle-orm"; +import { getServerAdapter } from "../adapters/index.js"; +import { + agentService, + issueService, + secretService, +} from "../services/index.js"; +import { notFound } from "../errors.js"; +import { parseObject } from "../adapters/utils.js"; + +/** + * Chat relay endpoint — calls the adapter directly and streams the response + * back via SSE. Bypasses the heartbeat queue for real-time conversation. + * + * Comments are persisted normally so the conversation is durable. + */ +export function agentChatRoutes(db: Db) { + const router = Router(); + + router.post("/agents/:id/chat/relay", async (req, res) => { + const agentId = req.params.id; + const { taskId, message } = req.body as { taskId: string; message: string }; + + if (!taskId || !message) { + res.status(400).json({ error: "taskId and message are required" }); + return; + } + + // Look up agent + const agentSvc = agentService(db); + const agent = await db + .select() + .from(agentsTable) + .where(eq(agentsTable.id, agentId)) + .then((rows) => rows[0] ?? null); + + if (!agent) { + throw notFound("Agent not found"); + } + + // Save the user's message as a comment + const issueSvc = issueService(db); + await issueSvc.addComment(taskId, message, { + userId: (req as any).actor?.userId ?? null, + }); + + // Set up SSE streaming response + res.writeHead(200, { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }); + res.flushHeaders(); + + // Send initial event + res.write(`data: ${JSON.stringify({ type: "start", agentId, agentName: agent.name })}\n\n`); + + try { + // Resolve adapter config with secrets + const config = parseObject(agent.adapterConfig); + const secretsSvc = secretService(db); + const { config: resolvedConfig } = await secretsSvc.resolveAdapterConfigForRuntime( + agent.companyId, + config, + ); + + // Get adapter + const adapter = getServerAdapter(agent.adapterType); + + // Execute directly — stream stdout chunks as SSE events + let fullResponse = ""; + const startTime = Date.now(); + + const result = await adapter.execute({ + runId: randomUUID(), + agent: agent as any, // DB row matches adapter expectation + runtime: { + sessionId: null, + sessionParams: null, + sessionDisplayId: null, + taskKey: null, + }, + config: resolvedConfig, + context: { + chatMessage: message, + taskId, + issueId: taskId, + source: "chat_relay", + wakeReason: "chat_relay", + }, + onLog: async (stream, chunk) => { + if (stream === "stdout" && res.writable) { + fullResponse += chunk; + res.write(`data: ${JSON.stringify({ type: "chunk", text: chunk })}\n\n`); + } + }, + onMeta: async () => { + // Silently consume metadata + }, + }); + + // Save the agent's full response as a comment + if (fullResponse.trim()) { + await issueSvc.addComment(taskId, fullResponse.trim(), { + agentId: agent.id, + }); + } + + // Send completion event + const duration = Date.now() - startTime; + if (res.writable) { + res.write( + `data: ${JSON.stringify({ + type: "done", + model: result.model ?? null, + provider: result.provider ?? null, + costUsd: result.costUsd ?? null, + duration, + exitCode: result.exitCode, + })}\n\n`, + ); + } + } catch (err) { + // Send error event + if (res.writable) { + const message = err instanceof Error ? err.message : "Relay execution failed"; + res.write(`data: ${JSON.stringify({ type: "error", message })}\n\n`); + } + } finally { + if (res.writable) { + res.end(); + } + } + }); + + return router; +} diff --git a/ui/src/components/CEOChatPanel.tsx b/ui/src/components/CEOChatPanel.tsx index bb604e81..3a9ad9fe 100644 --- a/ui/src/components/CEOChatPanel.tsx +++ b/ui/src/components/CEOChatPanel.tsx @@ -286,33 +286,91 @@ export function CEOChatPanel({ } }, [comments, detectedPlanCommentId, ignoreBeforeCommentId, taskId, onPlanDetected, queryClient]); - // Send message + // Streaming response state + const [streamingText, setStreamingText] = useState(""); + + // Send message — try streaming relay first, fall back to poll-based const sendMessage = useCallback(async (body: string) => { const trimmed = body.trim(); if (!trimmed || sending) return; setSending(true); - try { - try { - await issuesApi.update(taskId, { assigneeUserId: null }); - } catch { /* may already be null */ } - try { - await issuesApi.update(taskId, { - assigneeAgentId: agentId, - status: "in_progress", - }); - } catch { /* may already be assigned */ } + setInput(""); + setOptimisticTyping(true); - await issuesApi.addComment(taskId, trimmed, true, true); - setInput(""); - setOptimisticTyping(true); // Show typing indicator immediately - const latestId = comments?.[comments.length - 1]?.id ?? null; - setIgnoreBeforeCommentId(latestId); - setDetectedPlanCommentId(null); + const latestId = comments?.[comments.length - 1]?.id ?? null; + setIgnoreBeforeCommentId(latestId); + setDetectedPlanCommentId(null); + + // Ensure task is assigned to agent + try { + await issuesApi.update(taskId, { assigneeUserId: null }); + } catch { /* ok */ } + try { + await issuesApi.update(taskId, { assigneeAgentId: agentId, status: "in_progress" }); + } catch { /* ok */ } + + try { + // Try streaming relay + const res = await fetch(`/api/agents/${agentId}/chat/relay`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ taskId, message: trimmed }), + }); + + if (!res.ok || !res.body) { + throw new Error("Relay not available"); + } + + setOptimisticTyping(false); + setStreamingText(""); + + const reader = res.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split("\n"); + buffer = lines.pop() ?? ""; + + for (const line of lines) { + if (!line.startsWith("data: ")) continue; + try { + const event = JSON.parse(line.slice(6)); + if (event.type === "chunk") { + setStreamingText((prev) => prev + event.text); + } else if (event.type === "done") { + setStreamingText(""); + // Refresh comments to pick up persisted messages + queryClient.invalidateQueries({ + queryKey: queryKeys.issues.comments(taskId), + }); + } else if (event.type === "error") { + setStreamingText(""); + // Fall through — comments will still be polled + } + } catch { /* malformed SSE line, skip */ } + } + } + + setStreamingText(""); + queryClient.invalidateQueries({ + queryKey: queryKeys.issues.comments(taskId), + }); + } catch { + // Fallback: use the old comment-and-poll approach + try { + await issuesApi.addComment(taskId, trimmed, true, true); + } catch { /* already saved by relay, or genuinely failed */ } queryClient.invalidateQueries({ queryKey: queryKeys.issues.comments(taskId), }); } finally { setSending(false); + setOptimisticTyping(false); inputRef.current?.focus(); } }, [sending, taskId, agentId, queryClient, comments]); @@ -604,6 +662,21 @@ export function CEOChatPanel({ )} + {/* Streaming response — shows text as it arrives from the relay */} + {streamingText && ( +
+
+ + {agentName} + + streaming +
+
+ {streamingText} +
+
+ )} + {/* Optimistic typing indicator — shows immediately after user sends */} {optimisticTyping && !showStatus && (