diff --git a/server/src/app.ts b/server/src/app.ts index 8200ef83..54a21c87 100644 --- a/server/src/app.ts +++ b/server/src/app.ts @@ -26,6 +26,7 @@ import { instanceSettingsRoutes } from "./routes/instance-settings.js"; import { llmRoutes } from "./routes/llms.js"; import { assetRoutes } from "./routes/assets.js"; import { accessRoutes } from "./routes/access.js"; +import { agentChatRoutes } from "./routes/agent-chat.js"; import { pluginRoutes } from "./routes/plugins.js"; import { pluginUiStaticRoutes } from "./routes/plugin-ui-static.js"; import { applyUiBranding } from "./ui-branding.js"; @@ -137,6 +138,7 @@ export async function createApp( ); api.use("/companies", companyRoutes(db)); api.use(agentRoutes(db)); + api.use(agentChatRoutes(db)); api.use(assetRoutes(db, opts.storageService)); api.use(projectRoutes(db)); api.use(issueRoutes(db, opts.storageService)); diff --git a/server/src/routes/agent-chat.ts b/server/src/routes/agent-chat.ts new file mode 100644 index 00000000..150136a6 --- /dev/null +++ b/server/src/routes/agent-chat.ts @@ -0,0 +1,142 @@ +import { Router } from "express"; +import { randomUUID } from "node:crypto"; +import type { Db } from "@paperclipai/db"; +import { agents as agentsTable } from "@paperclipai/db"; +import { eq } from "drizzle-orm"; +import { getServerAdapter } from "../adapters/index.js"; +import { + agentService, + issueService, + secretService, +} from "../services/index.js"; +import { notFound } from "../errors.js"; +import { parseObject } from "../adapters/utils.js"; + +/** + * Chat relay endpoint — calls the adapter directly and streams the response + * back via SSE. Bypasses the heartbeat queue for real-time conversation. + * + * Comments are persisted normally so the conversation is durable. + */ +export function agentChatRoutes(db: Db) { + const router = Router(); + + router.post("/agents/:id/chat/relay", async (req, res) => { + const agentId = req.params.id; + const { taskId, message } = req.body as { taskId: string; message: string }; + + if (!taskId || !message) { + res.status(400).json({ error: "taskId and message are required" }); + return; + } + + // Look up agent + const agentSvc = agentService(db); + const agent = await db + .select() + .from(agentsTable) + .where(eq(agentsTable.id, agentId)) + .then((rows) => rows[0] ?? null); + + if (!agent) { + throw notFound("Agent not found"); + } + + // Save the user's message as a comment + const issueSvc = issueService(db); + await issueSvc.addComment(taskId, message, { + userId: (req as any).actor?.userId ?? null, + }); + + // Set up SSE streaming response + res.writeHead(200, { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }); + res.flushHeaders(); + + // Send initial event + res.write(`data: ${JSON.stringify({ type: "start", agentId, agentName: agent.name })}\n\n`); + + try { + // Resolve adapter config with secrets + const config = parseObject(agent.adapterConfig); + const secretsSvc = secretService(db); + const { config: resolvedConfig } = await secretsSvc.resolveAdapterConfigForRuntime( + agent.companyId, + config, + ); + + // Get adapter + const adapter = getServerAdapter(agent.adapterType); + + // Execute directly — stream stdout chunks as SSE events + let fullResponse = ""; + const startTime = Date.now(); + + const result = await adapter.execute({ + runId: randomUUID(), + agent: agent as any, // DB row matches adapter expectation + runtime: { + sessionId: null, + sessionParams: null, + sessionDisplayId: null, + taskKey: null, + }, + config: resolvedConfig, + context: { + chatMessage: message, + taskId, + issueId: taskId, + source: "chat_relay", + wakeReason: "chat_relay", + }, + onLog: async (stream, chunk) => { + if (stream === "stdout" && res.writable) { + fullResponse += chunk; + res.write(`data: ${JSON.stringify({ type: "chunk", text: chunk })}\n\n`); + } + }, + onMeta: async () => { + // Silently consume metadata + }, + }); + + // Save the agent's full response as a comment + if (fullResponse.trim()) { + await issueSvc.addComment(taskId, fullResponse.trim(), { + agentId: agent.id, + }); + } + + // Send completion event + const duration = Date.now() - startTime; + if (res.writable) { + res.write( + `data: ${JSON.stringify({ + type: "done", + model: result.model ?? null, + provider: result.provider ?? null, + costUsd: result.costUsd ?? null, + duration, + exitCode: result.exitCode, + })}\n\n`, + ); + } + } catch (err) { + // Send error event + if (res.writable) { + const message = err instanceof Error ? err.message : "Relay execution failed"; + res.write(`data: ${JSON.stringify({ type: "error", message })}\n\n`); + } + } finally { + if (res.writable) { + res.end(); + } + } + }); + + return router; +} diff --git a/ui/src/components/CEOChatPanel.tsx b/ui/src/components/CEOChatPanel.tsx index bb604e81..3a9ad9fe 100644 --- a/ui/src/components/CEOChatPanel.tsx +++ b/ui/src/components/CEOChatPanel.tsx @@ -286,33 +286,91 @@ export function CEOChatPanel({ } }, [comments, detectedPlanCommentId, ignoreBeforeCommentId, taskId, onPlanDetected, queryClient]); - // Send message + // Streaming response state + const [streamingText, setStreamingText] = useState(""); + + // Send message — try streaming relay first, fall back to poll-based const sendMessage = useCallback(async (body: string) => { const trimmed = body.trim(); if (!trimmed || sending) return; setSending(true); - try { - try { - await issuesApi.update(taskId, { assigneeUserId: null }); - } catch { /* may already be null */ } - try { - await issuesApi.update(taskId, { - assigneeAgentId: agentId, - status: "in_progress", - }); - } catch { /* may already be assigned */ } + setInput(""); + setOptimisticTyping(true); - await issuesApi.addComment(taskId, trimmed, true, true); - setInput(""); - setOptimisticTyping(true); // Show typing indicator immediately - const latestId = comments?.[comments.length - 1]?.id ?? null; - setIgnoreBeforeCommentId(latestId); - setDetectedPlanCommentId(null); + const latestId = comments?.[comments.length - 1]?.id ?? null; + setIgnoreBeforeCommentId(latestId); + setDetectedPlanCommentId(null); + + // Ensure task is assigned to agent + try { + await issuesApi.update(taskId, { assigneeUserId: null }); + } catch { /* ok */ } + try { + await issuesApi.update(taskId, { assigneeAgentId: agentId, status: "in_progress" }); + } catch { /* ok */ } + + try { + // Try streaming relay + const res = await fetch(`/api/agents/${agentId}/chat/relay`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ taskId, message: trimmed }), + }); + + if (!res.ok || !res.body) { + throw new Error("Relay not available"); + } + + setOptimisticTyping(false); + setStreamingText(""); + + const reader = res.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split("\n"); + buffer = lines.pop() ?? ""; + + for (const line of lines) { + if (!line.startsWith("data: ")) continue; + try { + const event = JSON.parse(line.slice(6)); + if (event.type === "chunk") { + setStreamingText((prev) => prev + event.text); + } else if (event.type === "done") { + setStreamingText(""); + // Refresh comments to pick up persisted messages + queryClient.invalidateQueries({ + queryKey: queryKeys.issues.comments(taskId), + }); + } else if (event.type === "error") { + setStreamingText(""); + // Fall through — comments will still be polled + } + } catch { /* malformed SSE line, skip */ } + } + } + + setStreamingText(""); + queryClient.invalidateQueries({ + queryKey: queryKeys.issues.comments(taskId), + }); + } catch { + // Fallback: use the old comment-and-poll approach + try { + await issuesApi.addComment(taskId, trimmed, true, true); + } catch { /* already saved by relay, or genuinely failed */ } queryClient.invalidateQueries({ queryKey: queryKeys.issues.comments(taskId), }); } finally { setSending(false); + setOptimisticTyping(false); inputRef.current?.focus(); } }, [sending, taskId, agentId, queryClient, comments]); @@ -604,6 +662,21 @@ export function CEOChatPanel({ )} + {/* Streaming response — shows text as it arrives from the relay */} + {streamingText && ( +