From ea1a176b67506ebc5e1c340c406f2d356350ffa2 Mon Sep 17 00:00:00 2001 From: Nexus Dev Date: Wed, 1 Apr 2026 18:06:32 +0000 Subject: [PATCH] feat(22-01): add SSE streaming endpoint and edit/truncate service methods - Add editMessage, truncateMessagesAfter, streamEcho methods to chatService - Add POST /conversations/:id/stream SSE endpoint with flushHeaders before loop (PERF-02) - Add PATCH /conversations/:id/messages/:msgId for message editing - Add DELETE /conversations/:id/messages/after/:msgId for message truncation - Import gt from drizzle-orm for createdAt comparison in truncateMessagesAfter - Guard all res.write() calls with res.writable check (prevents write-after-end) --- server/src/routes/chat.ts | 67 +++++++++++++++++++++++++++++++++++++ server/src/services/chat.ts | 37 +++++++++++++++++++- 2 files changed, 103 insertions(+), 1 deletion(-) diff --git a/server/src/routes/chat.ts b/server/src/routes/chat.ts index 605d1d58..1dbf27e7 100644 --- a/server/src/routes/chat.ts +++ b/server/src/routes/chat.ts @@ -77,5 +77,72 @@ export function chatRoutes(db: Db): Router { res.status(201).json(message); }); + // POST /api/conversations/:id/stream -- SSE streaming endpoint (CHAT-01, PERF-02) + router.post("/conversations/:id/stream", async (req, res) => { + assertBoard(req); + const { content, agentId } = req.body; + if (!content || typeof content !== "string") { + res.status(400).json({ error: "content is required" }); + return; + } + + // Set SSE headers and flush BEFORE any generation (PERF-02) + res.setHeader("Content-Type", "text/event-stream"); + res.setHeader("Cache-Control", "no-cache"); + res.setHeader("Connection", "keep-alive"); + res.setHeader("X-Accel-Buffering", "no"); + res.flushHeaders(); + res.write(":ok\n\n"); + + const abort = new AbortController(); + req.on("close", () => abort.abort()); + + try { + let fullContent = ""; + for await (const token of svc.streamEcho(content, abort.signal)) { + if (!res.writable) break; + fullContent += token; + res.write(`data: ${JSON.stringify({ token })}\n\n`); + } + if (res.writable && !abort.signal.aborted) { + const message = await svc.addMessage(req.params.id!, { + role: "assistant", + content: fullContent.trim(), + agentId: agentId || undefined, + }); + res.write(`data: ${JSON.stringify({ done: true, messageId: message.id, content: fullContent.trim() })}\n\n`); + } + } catch (err) { + if (res.writable && !abort.signal.aborted) { + res.write(`data: ${JSON.stringify({ error: "Stream error" })}\n\n`); + } + } finally { + res.end(); + } + }); + + // PATCH /api/conversations/:id/messages/:msgId -- Edit message content + router.patch("/conversations/:id/messages/:msgId", async (req, res) => { + assertBoard(req); + const { content } = req.body; + if (!content || typeof content !== "string") { + res.status(400).json({ error: "content is required" }); + return; + } + const message = await svc.editMessage(req.params.msgId!, content); + if (!message) { + res.status(404).json({ error: "Message not found" }); + return; + } + res.json(message); + }); + + // DELETE /api/conversations/:id/messages/after/:msgId -- Truncate messages after a given message + router.delete("/conversations/:id/messages/after/:msgId", async (req, res) => { + assertBoard(req); + await svc.truncateMessagesAfter(req.params.id!, req.params.msgId!); + res.status(204).end(); + }); + return router; } diff --git a/server/src/services/chat.ts b/server/src/services/chat.ts index 1580054a..9bf8d002 100644 --- a/server/src/services/chat.ts +++ b/server/src/services/chat.ts @@ -1,4 +1,4 @@ -import { and, desc, eq, ilike, isNull, lt } from "drizzle-orm"; +import { and, desc, eq, gt, ilike, isNull, lt } from "drizzle-orm"; import type { Db } from "@paperclipai/db"; import { chatConversations, chatMessages } from "@paperclipai/db"; import { notFound } from "../errors.js"; @@ -165,5 +165,40 @@ export function chatService(db: Db) { return message!; }, + + async editMessage(messageId: string, content: string) { + const [row] = await db + .update(chatMessages) + .set({ content }) + .where(eq(chatMessages.id, messageId)) + .returning(); + return row; + }, + + async truncateMessagesAfter(conversationId: string, messageId: string) { + // Get the target message's createdAt + const [target] = await db + .select({ createdAt: chatMessages.createdAt }) + .from(chatMessages) + .where(eq(chatMessages.id, messageId)); + if (!target) return; + await db + .delete(chatMessages) + .where( + and( + eq(chatMessages.conversationId, conversationId), + gt(chatMessages.createdAt, target.createdAt), + ), + ); + }, + + async *streamEcho(content: string, signal: AbortSignal) { + const words = content.split(/\s+/); + for (const word of words) { + if (signal.aborted) break; + await new Promise((r) => setTimeout(r, 50)); + yield word + " "; + } + }, }; }