experiment: add chat relay endpoint for real-time streaming responses

New POST /api/agents/:id/chat/relay endpoint that calls the adapter
directly and streams stdout back via SSE, bypassing the heartbeat
queue. Comments are persisted normally so conversations stay durable.
Frontend tries the relay first, falls back to poll-based flow if
unavailable.

Backend: 1 new file (agent-chat.ts), 1 line in app.ts.
Frontend: streaming fetch in CEOChatPanel with fallback.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
scotttong 2026-03-19 17:02:10 -07:00
parent 2d8003d2f5
commit fbd87da6d6
3 changed files with 234 additions and 17 deletions

View file

@ -26,6 +26,7 @@ import { instanceSettingsRoutes } from "./routes/instance-settings.js";
import { llmRoutes } from "./routes/llms.js";
import { assetRoutes } from "./routes/assets.js";
import { accessRoutes } from "./routes/access.js";
import { agentChatRoutes } from "./routes/agent-chat.js";
import { pluginRoutes } from "./routes/plugins.js";
import { pluginUiStaticRoutes } from "./routes/plugin-ui-static.js";
import { applyUiBranding } from "./ui-branding.js";
@ -137,6 +138,7 @@ export async function createApp(
);
api.use("/companies", companyRoutes(db));
api.use(agentRoutes(db));
api.use(agentChatRoutes(db));
api.use(assetRoutes(db, opts.storageService));
api.use(projectRoutes(db));
api.use(issueRoutes(db, opts.storageService));

View file

@ -0,0 +1,142 @@
import { Router } from "express";
import { randomUUID } from "node:crypto";
import type { Db } from "@paperclipai/db";
import { agents as agentsTable } from "@paperclipai/db";
import { eq } from "drizzle-orm";
import { getServerAdapter } from "../adapters/index.js";
import {
agentService,
issueService,
secretService,
} from "../services/index.js";
import { notFound } from "../errors.js";
import { parseObject } from "../adapters/utils.js";
/**
* Chat relay endpoint calls the adapter directly and streams the response
* back via SSE. Bypasses the heartbeat queue for real-time conversation.
*
* Comments are persisted normally so the conversation is durable.
*/
export function agentChatRoutes(db: Db) {
const router = Router();
router.post("/agents/:id/chat/relay", async (req, res) => {
const agentId = req.params.id;
const { taskId, message } = req.body as { taskId: string; message: string };
if (!taskId || !message) {
res.status(400).json({ error: "taskId and message are required" });
return;
}
// Look up agent
const agentSvc = agentService(db);
const agent = await db
.select()
.from(agentsTable)
.where(eq(agentsTable.id, agentId))
.then((rows) => rows[0] ?? null);
if (!agent) {
throw notFound("Agent not found");
}
// Save the user's message as a comment
const issueSvc = issueService(db);
await issueSvc.addComment(taskId, message, {
userId: (req as any).actor?.userId ?? null,
});
// Set up SSE streaming response
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
});
res.flushHeaders();
// Send initial event
res.write(`data: ${JSON.stringify({ type: "start", agentId, agentName: agent.name })}\n\n`);
try {
// Resolve adapter config with secrets
const config = parseObject(agent.adapterConfig);
const secretsSvc = secretService(db);
const { config: resolvedConfig } = await secretsSvc.resolveAdapterConfigForRuntime(
agent.companyId,
config,
);
// Get adapter
const adapter = getServerAdapter(agent.adapterType);
// Execute directly — stream stdout chunks as SSE events
let fullResponse = "";
const startTime = Date.now();
const result = await adapter.execute({
runId: randomUUID(),
agent: agent as any, // DB row matches adapter expectation
runtime: {
sessionId: null,
sessionParams: null,
sessionDisplayId: null,
taskKey: null,
},
config: resolvedConfig,
context: {
chatMessage: message,
taskId,
issueId: taskId,
source: "chat_relay",
wakeReason: "chat_relay",
},
onLog: async (stream, chunk) => {
if (stream === "stdout" && res.writable) {
fullResponse += chunk;
res.write(`data: ${JSON.stringify({ type: "chunk", text: chunk })}\n\n`);
}
},
onMeta: async () => {
// Silently consume metadata
},
});
// Save the agent's full response as a comment
if (fullResponse.trim()) {
await issueSvc.addComment(taskId, fullResponse.trim(), {
agentId: agent.id,
});
}
// Send completion event
const duration = Date.now() - startTime;
if (res.writable) {
res.write(
`data: ${JSON.stringify({
type: "done",
model: result.model ?? null,
provider: result.provider ?? null,
costUsd: result.costUsd ?? null,
duration,
exitCode: result.exitCode,
})}\n\n`,
);
}
} catch (err) {
// Send error event
if (res.writable) {
const message = err instanceof Error ? err.message : "Relay execution failed";
res.write(`data: ${JSON.stringify({ type: "error", message })}\n\n`);
}
} finally {
if (res.writable) {
res.end();
}
}
});
return router;
}

View file

@ -286,33 +286,91 @@ export function CEOChatPanel({
}
}, [comments, detectedPlanCommentId, ignoreBeforeCommentId, taskId, onPlanDetected, queryClient]);
// Send message
// Streaming response state
const [streamingText, setStreamingText] = useState("");
// Send message — try streaming relay first, fall back to poll-based
const sendMessage = useCallback(async (body: string) => {
const trimmed = body.trim();
if (!trimmed || sending) return;
setSending(true);
try {
try {
await issuesApi.update(taskId, { assigneeUserId: null });
} catch { /* may already be null */ }
try {
await issuesApi.update(taskId, {
assigneeAgentId: agentId,
status: "in_progress",
});
} catch { /* may already be assigned */ }
setInput("");
setOptimisticTyping(true);
await issuesApi.addComment(taskId, trimmed, true, true);
setInput("");
setOptimisticTyping(true); // Show typing indicator immediately
const latestId = comments?.[comments.length - 1]?.id ?? null;
setIgnoreBeforeCommentId(latestId);
setDetectedPlanCommentId(null);
const latestId = comments?.[comments.length - 1]?.id ?? null;
setIgnoreBeforeCommentId(latestId);
setDetectedPlanCommentId(null);
// Ensure task is assigned to agent
try {
await issuesApi.update(taskId, { assigneeUserId: null });
} catch { /* ok */ }
try {
await issuesApi.update(taskId, { assigneeAgentId: agentId, status: "in_progress" });
} catch { /* ok */ }
try {
// Try streaming relay
const res = await fetch(`/api/agents/${agentId}/chat/relay`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ taskId, message: trimmed }),
});
if (!res.ok || !res.body) {
throw new Error("Relay not available");
}
setOptimisticTyping(false);
setStreamingText("");
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
try {
const event = JSON.parse(line.slice(6));
if (event.type === "chunk") {
setStreamingText((prev) => prev + event.text);
} else if (event.type === "done") {
setStreamingText("");
// Refresh comments to pick up persisted messages
queryClient.invalidateQueries({
queryKey: queryKeys.issues.comments(taskId),
});
} else if (event.type === "error") {
setStreamingText("");
// Fall through — comments will still be polled
}
} catch { /* malformed SSE line, skip */ }
}
}
setStreamingText("");
queryClient.invalidateQueries({
queryKey: queryKeys.issues.comments(taskId),
});
} catch {
// Fallback: use the old comment-and-poll approach
try {
await issuesApi.addComment(taskId, trimmed, true, true);
} catch { /* already saved by relay, or genuinely failed */ }
queryClient.invalidateQueries({
queryKey: queryKeys.issues.comments(taskId),
});
} finally {
setSending(false);
setOptimisticTyping(false);
inputRef.current?.focus();
}
}, [sending, taskId, agentId, queryClient, comments]);
@ -604,6 +662,21 @@ export function CEOChatPanel({
</span>
</button>
)}
{/* Streaming response — shows text as it arrives from the relay */}
{streamingText && (
<div className="rounded-md px-2.5 py-1.5 text-[13px] leading-relaxed bg-muted/50 border border-border mr-6 animate-in fade-in duration-150">
<div className="flex items-center gap-1.5 mb-0.5">
<span className="text-[10px] font-medium uppercase tracking-wide text-muted-foreground">
{agentName}
</span>
<span className="text-[10px] text-cyan-500 font-medium">streaming</span>
</div>
<div className="prose prose-xs dark:prose-invert max-w-none text-[13px] [&>*:first-child]:mt-0 [&>*:last-child]:mb-0">
<MarkdownBody>{streamingText}</MarkdownBody>
</div>
</div>
)}
{/* Optimistic typing indicator — shows immediately after user sends */}
{optimisticTyping && !showStatus && (
<div className="flex items-center gap-2 text-[12px] text-muted-foreground px-3 py-1.5">