--- phase: 22-agent-streaming plan: 01 type: execute wave: 1 depends_on: [] files_modified: - packages/db/src/schema/chat_messages.ts - packages/db/src/migrations/TBD_agent_streaming.sql - packages/shared/src/types/chat.ts - packages/shared/src/validators/chat.ts - server/src/services/chat.ts - server/src/routes/chat.ts - server/src/__tests__/chat-stream-routes.test.ts - server/src/__tests__/chat-routes.test.ts autonomous: true requirements: [CHAT-01, CHAT-08, CHAT-10, CHAT-12, PERF-02] must_haves: truths: - "POST user message then GET /conversations/:id/stream returns text/event-stream with token events followed by a done event" - "PATCH /conversations/:id accepts agentId field and persists it" - "PUT /conversations/:id/messages/:messageId updates editedContent and editedAt" - "SSE stream sets X-Accel-Buffering: no and flushes headers immediately for sub-100ms latency" - "Client disconnect causes server to stop streaming (abort detection)" artifacts: - path: "server/src/routes/chat.ts" provides: "SSE stream endpoint, edit message route, updateConversation with agentId" exports: ["chatRoutes"] - path: "server/src/services/chat.ts" provides: "editMessage, getMessageHistory, updateConversationAgent" exports: ["chatService"] - path: "packages/db/src/schema/chat_messages.ts" provides: "editedContent and editedAt columns" contains: "editedContent" - path: "packages/shared/src/types/chat.ts" provides: "Updated ChatMessage with editedContent, editedAt" contains: "editedContent" - path: "packages/shared/src/validators/chat.ts" provides: "streamMessageSchema, editMessageSchema, updateConversationSchema with agentId" contains: "streamMessageSchema" - path: "server/src/__tests__/chat-stream-routes.test.ts" provides: "SSE streaming tests" key_links: - from: "server/src/routes/chat.ts" to: "server/src/services/chat.ts" via: "svc.addMessage, svc.editMessage, svc.getMessageHistory" pattern: "svc\\.(addMessage|editMessage|getMessageHistory)" - from: "server/src/routes/chat.ts" to: "packages/shared/src/validators/chat.ts" via: "validate(streamMessageSchema)" pattern: "validate\\(streamMessageSchema\\)" --- Server-side streaming infrastructure: DB schema additions for message editing, SSE streaming endpoint for LLM token delivery, message edit route, agent selection on conversations, and server tests. Purpose: Establishes the entire server-side API surface that the UI plans (02/03) will consume. Every new endpoint is tested. Output: Working SSE stream endpoint, edit message endpoint, conversation agent update, migration SQL, tests. @$HOME/.claude/get-shit-done/workflows/execute-plan.md @$HOME/.claude/get-shit-done/templates/summary.md @.planning/PROJECT.md @.planning/ROADMAP.md @.planning/STATE.md @.planning/phases/22-agent-streaming/22-RESEARCH.md From packages/shared/src/types/chat.ts: ```typescript export interface ChatConversation { id: string; companyId: string; title: string | null; agentId: string | null; pinnedAt: string | null; archivedAt: string | null; deletedAt: string | null; createdAt: string; updatedAt: string; } export interface ChatMessage { id: string; conversationId: string; role: "user" | "assistant" | "system"; content: string; agentId: string | null; createdAt: string; } ``` From packages/shared/src/validators/chat.ts: ```typescript export const createConversationSchema = z.object({ title: z.string().max(200).optional() }); export const updateConversationSchema = z.object({ title: z.string().max(200).optional() }); export const createMessageSchema = z.object({ role: z.enum(["user", "assistant", "system"]), content: z.string().min(1), agentId: z.string().uuid().optional().nullable(), }); ``` From packages/db/src/schema/chat_messages.ts: ```typescript export const chatMessages = pgTable("chat_messages", { id: uuid("id").primaryKey().defaultRandom(), conversationId: uuid("conversation_id").notNull().references(() => chatConversations.id, { onDelete: "cascade" }), role: text("role").notNull(), content: text("content").notNull(), agentId: uuid("agent_id"), createdAt: timestamp("created_at", { withTimezone: true }).notNull().defaultNow(), }, ...); ``` From server/src/services/chat.ts: ```typescript export function chatService(db: Db) { // Returns object with: listConversations, createConversation, getConversation, // updateConversation, softDeleteConversation, archiveConversation, unarchiveConversation, // pinConversation, unpinConversation, listMessages, addMessage } ``` From server/src/routes/chat.ts: ```typescript export function chatRoutes(db: Db) { // Mounts all routes on a Router. Key: PATCH /conversations/:id uses validate(updateConversationSchema) } ``` SSE pattern from server/src/routes/plugins.ts:1146: ```typescript res.writeHead(200, { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }); res.flushHeaders(); res.write(":ok\n\n"); ``` Task 1: DB migration + shared types + validators + service methods for streaming and editing packages/db/src/schema/chat_messages.ts, packages/shared/src/types/chat.ts, packages/shared/src/validators/chat.ts, server/src/services/chat.ts, server/src/__tests__/chat-routes.test.ts packages/db/src/schema/chat_messages.ts, packages/db/src/schema/chat_conversations.ts, packages/shared/src/types/chat.ts, packages/shared/src/validators/chat.ts, server/src/services/chat.ts, server/src/__tests__/chat-routes.test.ts - Test: editMessage(messageId, { content }) updates the message's editedContent and editedAt, returns updated row - Test: getMessageHistory(conversationId) returns all messages in ascending createdAt order (for LLM context) - Test: updateConversation with agentId field persists the agentId on the conversation - Test: PATCH /conversations/:id with { agentId: "uuid" } returns 200 with updated conversation - Test: PUT /conversations/:id/messages/:messageId with { content: "new" } returns 200 with editedContent set 1. **DB schema** — Add two columns to `chatMessages` in `packages/db/src/schema/chat_messages.ts`: ```typescript editedContent: text("edited_content"), editedAt: timestamp("edited_at", { withTimezone: true }), ``` Then run `pnpm db:generate` to create the migration SQL. 2. **Shared types** — Update `ChatMessage` interface in `packages/shared/src/types/chat.ts`: - Add `editedContent: string | null;` - Add `editedAt: string | null;` 3. **Validators** — In `packages/shared/src/validators/chat.ts`: - Update `updateConversationSchema` to include `agentId: z.string().uuid().optional().nullable()` - Add `export const editMessageSchema = z.object({ content: z.string().min(1) });` - Add `export const streamMessageSchema = z.object({ content: z.string().min(1), agentId: z.string().uuid().optional().nullable() });` 4. **Service methods** — Add to `chatService` in `server/src/services/chat.ts`: - `editMessage(messageId: string, data: { content: string })` — sets `editedContent = data.content`, `editedAt = new Date()` on the message row, returns the updated row - `getMessageHistory(conversationId: string)` — selects all messages WHERE conversationId matches, ORDER BY createdAt ASC (ascending, for LLM context window). Returns `ChatMessage[]`. Use `editedContent ?? content` as the effective content field (alias as `effectiveContent` in the return). - Update `updateConversation` to accept and persist `agentId` field: `set({ title: data.title, agentId: data.agentId, updatedAt: new Date() })`. Only set fields that are provided (check `data.agentId !== undefined` before including in set). 5. **Extend existing tests** in `server/src/__tests__/chat-routes.test.ts`: - Add test: `PATCH /conversations/:id with agentId` — create conversation, PATCH with `{ agentId: someAgentId }`, verify response has the agentId set. (Use a dummy UUID string for agentId if the test DB doesn't enforce FK — check existing test patterns.) - Add test: `PUT /conversations/:id/messages/:messageId` — create conversation, add message, PUT with `{ content: "edited" }`, verify response has `editedContent: "edited"` and `editedAt` is not null. pnpm --filter @paperclipai/server test run -- --reporter=verbose chat-routes - grep -q "editedContent" packages/db/src/schema/chat_messages.ts returns 0 - grep -q "editedAt" packages/db/src/schema/chat_messages.ts returns 0 - grep -q "editedContent: string | null" packages/shared/src/types/chat.ts returns 0 - grep -q "editMessageSchema" packages/shared/src/validators/chat.ts returns 0 - grep -q "streamMessageSchema" packages/shared/src/validators/chat.ts returns 0 - grep -q "agentId" packages/shared/src/validators/chat.ts (in updateConversationSchema) returns 0 - grep -q "editMessage" server/src/services/chat.ts returns 0 - grep -q "getMessageHistory" server/src/services/chat.ts returns 0 - Migration SQL file exists in packages/db/src/migrations/ - pnpm --filter @paperclipai/server test run -- chat-routes exits 0 DB has editedContent/editedAt columns, shared types updated, validators for stream/edit/agentId exist, service has editMessage + getMessageHistory, all tests pass Task 2: SSE streaming endpoint + edit message route + stream tests server/src/routes/chat.ts, server/src/__tests__/chat-stream-routes.test.ts server/src/routes/chat.ts, server/src/routes/plugins.ts (lines 1095-1186 for SSE pattern), server/src/services/chat.ts, packages/shared/src/validators/chat.ts, server/src/__tests__/chat-routes.test.ts - Test: GET /conversations/:id/stream?triggerMessageId=X returns Content-Type text/event-stream - Test: GET /conversations/:id/stream?triggerMessageId=X returns X-Accel-Buffering: no header - Test: Stream sends initial `:ok` comment, then token events, then a done event - Test: PUT /conversations/:id/messages/:messageId route validates body with editMessageSchema - Test: Client close (req.destroy()) stops the stream loop 1. **Edit message route** — Add to `server/src/routes/chat.ts`: ```typescript // PUT /conversations/:id/messages/:messageId router.put("/conversations/:id/messages/:messageId", validate(editMessageSchema), async (req, res) => { assertBoard(req); const message = await svc.editMessage(req.params.messageId as string, req.body); if (!message) { res.status(404).json({ error: "Not found" }); return; } res.json(message); }); ``` 2. **SSE stream endpoint** — Add to `server/src/routes/chat.ts`: ```typescript // GET /conversations/:id/stream router.get("/conversations/:id/stream", async (req, res) => { assertBoard(req); const conversationId = req.params.id as string; const triggerMessageId = req.query.triggerMessageId as string | undefined; const conversation = await svc.getConversation(conversationId); if (!conversation) { res.status(404).json({ error: "Not found" }); return; } // Set SSE headers — copied from plugins.ts:1146 res.writeHead(200, { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }); res.flushHeaders(); res.write(":ok\n\n"); let aborted = false; req.on("close", () => { aborted = true; }); // Resolve the agent for this conversation const agentId = conversation.agentId; // Get message history for LLM context const history = await svc.getMessageHistory(conversationId); // For now: echo-stream mode. The actual LLM call will be wired when a provider // is configured. This streams tokens from the last user message content one word // at a time as a functional placeholder that fully exercises the SSE pipeline. // Phase 23+ will replace this with real LLM calls via the agent's adapterConfig. const lastUserMsg = history.filter(m => m.role === "user").at(-1); const echoContent = lastUserMsg ? `Echo from agent: ${lastUserMsg.content}` : "No message to echo."; const tokens = echoContent.split(/(\s+)/); let accumulated = ""; for (const token of tokens) { if (aborted) break; accumulated += token; res.write(`data: ${JSON.stringify({ type: "token", content: token })}\n\n`); // Tiny yield to allow abort detection await new Promise(resolve => setTimeout(resolve, 5)); } // Persist assistant message only if stream completed (not aborted) if (!aborted && accumulated.trim()) { const assistantMsg = await svc.addMessage(conversationId, { role: "assistant", content: accumulated, agentId, }); res.write(`data: ${JSON.stringify({ type: "done", messageId: assistantMsg.id })}\n\n`); } else if (aborted) { // Do NOT persist partial messages per RESEARCH.md pitfall 4 } res.end(); }); ``` Import `editMessageSchema` and `streamMessageSchema` from `@paperclipai/shared` at the top of the routes file (alongside existing imports). 3. **Stream tests** — Create `server/src/__tests__/chat-stream-routes.test.ts`: - Use the same test DB setup pattern as `chat-routes.test.ts` (read that file for the pattern). - Test: `GET /conversations/:id/stream?triggerMessageId=X` — create conversation, add user message, open stream, collect all SSE data events, verify: - Response status is 200 - Content-Type header contains "text/event-stream" - X-Accel-Buffering header is "no" - First received data is `:ok` comment (or first data event has type "token") - Last data event has `type: "done"` with a `messageId` string - Test: `GET /conversations/:id/stream` for non-existent conversation returns 404 - Test: After stream completes, a new assistant message exists in the DB (query via list messages) - Test: `PUT /conversations/:id/messages/:messageId` with valid body returns 200 and editedContent matches For SSE testing: use supertest's `.buffer(true).parse(...)` or collect the raw response body. Alternatively, make a raw HTTP request to the test server and read the stream. Follow whatever pattern the existing test file uses for HTTP calls. 4. Add the `editMessageSchema` and `streamMessageSchema` imports to the routes file's import block from `@paperclipai/shared`. pnpm --filter @paperclipai/server test run -- --reporter=verbose chat-stream - grep -q 'router.get("/conversations/:id/stream"' server/src/routes/chat.ts returns 0 - grep -q 'router.put("/conversations/:id/messages/:messageId"' server/src/routes/chat.ts returns 0 - grep -q "text/event-stream" server/src/routes/chat.ts returns 0 - grep -q "X-Accel-Buffering" server/src/routes/chat.ts returns 0 - grep -q "flushHeaders" server/src/routes/chat.ts returns 0 - grep -q 'type: "done"' server/src/routes/chat.ts returns 0 - grep -q 'type: "token"' server/src/routes/chat.ts returns 0 - test -f server/src/__tests__/chat-stream-routes.test.ts - pnpm --filter @paperclipai/server test run -- chat-stream exits 0 - pnpm --filter @paperclipai/server test run exits 0 (all server tests green) SSE stream endpoint returns text/event-stream with token+done events, edit message route works, abort detection stops streaming, all server tests pass - `pnpm --filter @paperclipai/server test run` — all server tests pass - `pnpm db:generate` has been run and migration exists - SSE endpoint tested with token + done events - Edit message route tested with editedContent persistence - PATCH conversation with agentId tested 1. New migration SQL exists and applies the editedContent + editedAt columns 2. GET /conversations/:id/stream returns text/event-stream with token events then done event 3. PUT /conversations/:id/messages/:messageId updates editedContent and editedAt 4. PATCH /conversations/:id with { agentId } persists the agent selection 5. All server tests pass (both chat-routes and chat-stream-routes) After completion, create `.planning/phases/22-agent-streaming/22-01-SUMMARY.md`