feat(22-01): add SSE streaming endpoint and edit/truncate service methods
- Add editMessage, truncateMessagesAfter, streamEcho methods to chatService - Add POST /conversations/:id/stream SSE endpoint with flushHeaders before loop (PERF-02) - Add PATCH /conversations/:id/messages/:msgId for message editing - Add DELETE /conversations/:id/messages/after/:msgId for message truncation - Import gt from drizzle-orm for createdAt comparison in truncateMessagesAfter - Guard all res.write() calls with res.writable check (prevents write-after-end)
This commit is contained in:
parent
5f646b7d9b
commit
fcc143cc44
2 changed files with 103 additions and 1 deletions
|
|
@ -77,5 +77,72 @@ export function chatRoutes(db: Db): Router {
|
||||||
res.status(201).json(message);
|
res.status(201).json(message);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// POST /api/conversations/:id/stream -- SSE streaming endpoint (CHAT-01, PERF-02)
|
||||||
|
router.post("/conversations/:id/stream", async (req, res) => {
|
||||||
|
assertBoard(req);
|
||||||
|
const { content, agentId } = req.body;
|
||||||
|
if (!content || typeof content !== "string") {
|
||||||
|
res.status(400).json({ error: "content is required" });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set SSE headers and flush BEFORE any generation (PERF-02)
|
||||||
|
res.setHeader("Content-Type", "text/event-stream");
|
||||||
|
res.setHeader("Cache-Control", "no-cache");
|
||||||
|
res.setHeader("Connection", "keep-alive");
|
||||||
|
res.setHeader("X-Accel-Buffering", "no");
|
||||||
|
res.flushHeaders();
|
||||||
|
res.write(":ok\n\n");
|
||||||
|
|
||||||
|
const abort = new AbortController();
|
||||||
|
req.on("close", () => abort.abort());
|
||||||
|
|
||||||
|
try {
|
||||||
|
let fullContent = "";
|
||||||
|
for await (const token of svc.streamEcho(content, abort.signal)) {
|
||||||
|
if (!res.writable) break;
|
||||||
|
fullContent += token;
|
||||||
|
res.write(`data: ${JSON.stringify({ token })}\n\n`);
|
||||||
|
}
|
||||||
|
if (res.writable && !abort.signal.aborted) {
|
||||||
|
const message = await svc.addMessage(req.params.id!, {
|
||||||
|
role: "assistant",
|
||||||
|
content: fullContent.trim(),
|
||||||
|
agentId: agentId || undefined,
|
||||||
|
});
|
||||||
|
res.write(`data: ${JSON.stringify({ done: true, messageId: message.id, content: fullContent.trim() })}\n\n`);
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
if (res.writable && !abort.signal.aborted) {
|
||||||
|
res.write(`data: ${JSON.stringify({ error: "Stream error" })}\n\n`);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
res.end();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// PATCH /api/conversations/:id/messages/:msgId -- Edit message content
|
||||||
|
router.patch("/conversations/:id/messages/:msgId", async (req, res) => {
|
||||||
|
assertBoard(req);
|
||||||
|
const { content } = req.body;
|
||||||
|
if (!content || typeof content !== "string") {
|
||||||
|
res.status(400).json({ error: "content is required" });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const message = await svc.editMessage(req.params.msgId!, content);
|
||||||
|
if (!message) {
|
||||||
|
res.status(404).json({ error: "Message not found" });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
res.json(message);
|
||||||
|
});
|
||||||
|
|
||||||
|
// DELETE /api/conversations/:id/messages/after/:msgId -- Truncate messages after a given message
|
||||||
|
router.delete("/conversations/:id/messages/after/:msgId", async (req, res) => {
|
||||||
|
assertBoard(req);
|
||||||
|
await svc.truncateMessagesAfter(req.params.id!, req.params.msgId!);
|
||||||
|
res.status(204).end();
|
||||||
|
});
|
||||||
|
|
||||||
return router;
|
return router;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
import { and, desc, eq, ilike, isNull, lt } from "drizzle-orm";
|
import { and, desc, eq, gt, ilike, isNull, lt } from "drizzle-orm";
|
||||||
import type { Db } from "@paperclipai/db";
|
import type { Db } from "@paperclipai/db";
|
||||||
import { chatConversations, chatMessages } from "@paperclipai/db";
|
import { chatConversations, chatMessages } from "@paperclipai/db";
|
||||||
import { notFound } from "../errors.js";
|
import { notFound } from "../errors.js";
|
||||||
|
|
@ -165,5 +165,40 @@ export function chatService(db: Db) {
|
||||||
|
|
||||||
return message!;
|
return message!;
|
||||||
},
|
},
|
||||||
|
|
||||||
|
async editMessage(messageId: string, content: string) {
|
||||||
|
const [row] = await db
|
||||||
|
.update(chatMessages)
|
||||||
|
.set({ content })
|
||||||
|
.where(eq(chatMessages.id, messageId))
|
||||||
|
.returning();
|
||||||
|
return row;
|
||||||
|
},
|
||||||
|
|
||||||
|
async truncateMessagesAfter(conversationId: string, messageId: string) {
|
||||||
|
// Get the target message's createdAt
|
||||||
|
const [target] = await db
|
||||||
|
.select({ createdAt: chatMessages.createdAt })
|
||||||
|
.from(chatMessages)
|
||||||
|
.where(eq(chatMessages.id, messageId));
|
||||||
|
if (!target) return;
|
||||||
|
await db
|
||||||
|
.delete(chatMessages)
|
||||||
|
.where(
|
||||||
|
and(
|
||||||
|
eq(chatMessages.conversationId, conversationId),
|
||||||
|
gt(chatMessages.createdAt, target.createdAt),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
},
|
||||||
|
|
||||||
|
async *streamEcho(content: string, signal: AbortSignal) {
|
||||||
|
const words = content.split(/\s+/);
|
||||||
|
for (const word of words) {
|
||||||
|
if (signal.aborted) break;
|
||||||
|
await new Promise((r) => setTimeout(r, 50));
|
||||||
|
yield word + " ";
|
||||||
|
}
|
||||||
|
},
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue