--- phase: 22-agent-streaming plan: "01" type: execute wave: 1 depends_on: ["22-00"] files_modified: - server/src/services/chat.ts - server/src/routes/chat.ts - ui/src/hooks/useStreamingChat.ts - ui/src/hooks/useStreamingChat.test.ts - ui/src/api/chat.ts autonomous: true requirements: - CHAT-01 - CHAT-12 - PERF-02 must_haves: truths: - "Server SSE endpoint streams token events as text/event-stream" - "Client hook accumulates tokens into streamingContent string" - "User can stop a stream mid-generation and partial content is preserved" - "First SSE headers are flushed before any LLM generation begins" artifacts: - path: "server/src/routes/chat.ts" provides: "POST /conversations/:id/stream SSE endpoint" contains: "text/event-stream" - path: "server/src/services/chat.ts" provides: "editMessage and truncateMessagesAfter methods" contains: "editMessage" - path: "ui/src/hooks/useStreamingChat.ts" provides: "SSE lifecycle hook" exports: ["useStreamingChat"] - path: "ui/src/api/chat.ts" provides: "postMessageAndStream method" contains: "postMessageAndStream" key_links: - from: "ui/src/hooks/useStreamingChat.ts" to: "server POST /conversations/:id/stream" via: "fetch with ReadableStream" pattern: "fetch.*stream" - from: "server/src/routes/chat.ts" to: "server/src/services/chat.ts" via: "svc.addMessage for final commit" pattern: "svc\\.addMessage" --- SSE streaming endpoint on the server and `useStreamingChat` hook on the client. The server uses a stub echo stream (repeats user message as fake tokens) since real LLM integration is Phase 23. The client uses `fetch` with `ReadableStream` (not native `EventSource`) because the stream endpoint is POST-based. Purpose: Enable real-time token-by-token message delivery (CHAT-01), stop generation (CHAT-12), and sub-100ms first-token latency (PERF-02). Output: Server SSE route, chat service edit/truncate methods, client streaming hook, updated chat API client. @$HOME/.claude/get-shit-done/workflows/execute-plan.md @$HOME/.claude/get-shit-done/templates/summary.md @.planning/PROJECT.md @.planning/ROADMAP.md @.planning/STATE.md @.planning/phases/22-agent-streaming/22-RESEARCH.md @.planning/phases/22-agent-streaming/22-00-SUMMARY.md From server/src/routes/chat.ts (existing): ```typescript export function chatRoutes(db: Db): Router { const router = Router(); const svc = chatService(db); // ... existing routes: POST /conversations, GET /conversations/:id, etc. } ``` From server/src/services/chat.ts (existing): ```typescript export function chatService(db: Db) { return { createConversation(...), listConversations(...), getConversation(...), updateConversation(...), softDeleteConversation(...), listMessages(...), addMessage(...) }; } ``` From server/src/routes/plugins.ts (SSE pattern): ```typescript res.writeHead(200, { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }); res.flushHeaders(); res.write(":ok\n\n"); ``` From ui/src/api/chat.ts (existing): ```typescript export const chatApi = { listConversations(...), createConversation(...), getConversation(...), updateConversation(...), deleteConversation(...), listMessages(...), postMessage(...) }; ``` From ui/src/hooks/useChatMessages.ts: ```typescript export function useChatMessages(conversationId: string | null) { // queryKey: ["chat", "messages", conversationId] // sendMutation invalidates: ["chat", "messages", conversationId] and ["chat", "conversations"] } ``` Task 1: Server SSE streaming endpoint + edit/truncate service methods - server/src/routes/chat.ts - server/src/services/chat.ts - server/src/routes/plugins.ts (lines 1140-1185 for SSE pattern) - packages/db/src/schema/chat_messages.ts server/src/services/chat.ts, server/src/routes/chat.ts **1. Add three new methods to `chatService` in `server/src/services/chat.ts`:** a) `editMessage(messageId: string, content: string)` -- Updates a message's content and updatedAt: ```typescript async editMessage(messageId: string, content: string) { const [row] = await db .update(chatMessages) .set({ content, updatedAt: new Date() }) .where(eq(chatMessages.id, messageId)) .returning(); return row; }, ``` b) `truncateMessagesAfter(conversationId: string, messageId: string)` -- Deletes all messages in the conversation created after the given message: ```typescript async truncateMessagesAfter(conversationId: string, messageId: string) { // Get the target message's createdAt const [target] = await db .select({ createdAt: chatMessages.createdAt }) .from(chatMessages) .where(eq(chatMessages.id, messageId)); if (!target) return; await db .delete(chatMessages) .where( and( eq(chatMessages.conversationId, conversationId), gt(chatMessages.createdAt, target.createdAt), ), ); }, ``` Import `gt` from `drizzle-orm` alongside existing imports. c) `streamEcho(content: string, signal: AbortSignal)` -- Async generator that yields fake tokens (stub for Phase 23 real LLM): ```typescript async *streamEcho(content: string, signal: AbortSignal) { const words = content.split(/\s+/); for (const word of words) { if (signal.aborted) break; await new Promise((r) => setTimeout(r, 50)); yield word + " "; } }, ``` **2. Add three new routes to `chatRoutes` in `server/src/routes/chat.ts`:** a) `POST /conversations/:id/stream` -- SSE streaming endpoint: ```typescript router.post("/conversations/:id/stream", async (req, res) => { assertBoard(req); const { content, agentId } = req.body; if (!content || typeof content !== "string") { res.status(400).json({ error: "content is required" }); return; } // Set SSE headers and flush BEFORE any generation (PERF-02) res.setHeader("Content-Type", "text/event-stream"); res.setHeader("Cache-Control", "no-cache"); res.setHeader("Connection", "keep-alive"); res.setHeader("X-Accel-Buffering", "no"); res.flushHeaders(); res.write(":ok\n\n"); const abort = new AbortController(); req.on("close", () => abort.abort()); try { let fullContent = ""; for await (const token of svc.streamEcho(content, abort.signal)) { if (!res.writable) break; fullContent += token; res.write(`data: ${JSON.stringify({ token })}\n\n`); } if (res.writable && !abort.signal.aborted) { const message = await svc.addMessage(req.params.id!, { role: "assistant", content: fullContent.trim(), agentId: agentId || undefined, }); res.write(`data: ${JSON.stringify({ done: true, messageId: message.id, content: fullContent.trim() })}\n\n`); } } catch (err) { if (res.writable && !abort.signal.aborted) { res.write(`data: ${JSON.stringify({ error: "Stream error" })}\n\n`); } } finally { res.end(); } }); ``` CRITICAL: `res.flushHeaders()` MUST be called before the for-await loop. Check `res.writable` before every `res.write()` (same guard pattern as `plugins.ts`). b) `PATCH /conversations/:id/messages/:msgId` -- Edit message content: ```typescript router.patch("/conversations/:id/messages/:msgId", async (req, res) => { assertBoard(req); const { content } = req.body; if (!content || typeof content !== "string") { res.status(400).json({ error: "content is required" }); return; } const message = await svc.editMessage(req.params.msgId!, content); if (!message) { res.status(404).json({ error: "Message not found" }); return; } res.json(message); }); ``` c) `DELETE /conversations/:id/messages/after/:msgId` -- Truncate messages after a given message: ```typescript router.delete("/conversations/:id/messages/after/:msgId", async (req, res) => { assertBoard(req); await svc.truncateMessagesAfter(req.params.id!, req.params.msgId!); res.status(204).end(); }); ``` cd /opt/nexus && pnpm --filter @paperclipai/server exec -- tsc --noEmit 2>&1 | head -20 && echo "--- PERF-02 flushHeaders-before-loop check ---" && python3 -c " import re code = open('server/src/routes/chat.ts').read() flush_pos = code.find('flushHeaders') loop_pos = code.find('for await') assert flush_pos != -1, 'flushHeaders not found' assert loop_pos != -1, 'for await not found' assert flush_pos < loop_pos, f'PERF-02 FAIL: flushHeaders ({flush_pos}) must precede for-await ({loop_pos})' print('PERF-02 OK: flushHeaders precedes for-await loop') " - grep -q "text/event-stream" server/src/routes/chat.ts - grep -q "flushHeaders" server/src/routes/chat.ts - grep -q "editMessage" server/src/services/chat.ts - grep -q "truncateMessagesAfter" server/src/services/chat.ts - grep -q "streamEcho" server/src/services/chat.ts - grep -q "res.writable" server/src/routes/chat.ts - grep -q "/conversations/:id/stream" server/src/routes/chat.ts - grep -q "/conversations/:id/messages/:msgId" server/src/routes/chat.ts - grep -q "/conversations/:id/messages/after/:msgId" server/src/routes/chat.ts - flushHeaders() position in file must precede for-await loop position (PERF-02) - POST /conversations/:id/stream SSE endpoint exists with proper headers flushed before generation - PATCH /conversations/:id/messages/:msgId edits message content - DELETE /conversations/:id/messages/after/:msgId truncates subsequent messages - chatService has editMessage, truncateMessagesAfter, and streamEcho methods - All routes check res.writable before writing (prevents write-after-end) - PERF-02 verified: flushHeaders precedes the for-await generation loop - Server TypeScript compiles without errors in chat files Task 2: useStreamingChat hook, chat API stream method, and real unit tests - ui/src/hooks/useChatMessages.ts - ui/src/api/chat.ts - ui/src/plugins/bridge.ts - ui/src/hooks/useStreamingChat.test.ts ui/src/hooks/useStreamingChat.ts, ui/src/hooks/useStreamingChat.test.ts, ui/src/api/chat.ts **1. Add stream-related methods to `chatApi` in `ui/src/api/chat.ts`:** ```typescript async postMessageAndStream( conversationId: string, data: { content: string; agentId?: string }, callbacks: { onToken: (token: string) => void; onDone: (messageId: string, content: string) => void; onError: (error: string) => void; }, signal?: AbortSignal, ) { const res = await fetch(`/api/conversations/${conversationId}/stream`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify(data), credentials: "include", signal, }); if (!res.ok || !res.body) { callbacks.onError("Failed to start stream"); return; } const reader = res.body.getReader(); const decoder = new TextDecoder(); let buffer = ""; try { while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const lines = buffer.split("\n"); buffer = lines.pop() ?? ""; for (const line of lines) { if (!line.startsWith("data: ")) continue; const json = line.slice(6); try { const parsed = JSON.parse(json) as { token?: string; done?: boolean; messageId?: string; content?: string; error?: string }; if (parsed.token) callbacks.onToken(parsed.token); if (parsed.done && parsed.messageId) callbacks.onDone(parsed.messageId, parsed.content ?? ""); if (parsed.error) callbacks.onError(parsed.error); } catch { /* ignore malformed lines */ } } } } catch (err) { if (signal?.aborted) return; // Expected on stop callbacks.onError("Stream connection lost"); } }, async savePartialMessage(conversationId: string, data: { role: "assistant"; content: string; agentId?: string }) { return chatApi.postMessage(conversationId, data); }, ``` Use `fetch` with `ReadableStream` instead of `EventSource` because the endpoint is POST-based. `EventSource` only supports GET (Open Question 2 from RESEARCH.md). **2. Create `ui/src/hooks/useStreamingChat.ts`:** ```typescript import { useRef, useState, useTransition, useCallback } from "react"; import { useQueryClient } from "@tanstack/react-query"; import { chatApi } from "../api/chat"; export function useStreamingChat(conversationId: string | null) { const [streamingContent, setStreamingContent] = useState(""); const [isStreaming, setIsStreaming] = useState(false); const abortRef = useRef(null); const queryClient = useQueryClient(); const [, startTransition] = useTransition(); const startStream = useCallback( (userMessage: string, agentId?: string) => { if (!conversationId) return; setIsStreaming(true); setStreamingContent(""); const abort = new AbortController(); abortRef.current = abort; chatApi.postMessageAndStream( conversationId, { content: userMessage, agentId }, { onToken: (token: string) => { startTransition(() => { setStreamingContent((prev) => prev + token); }); }, onDone: (messageId: string, content: string) => { setIsStreaming(false); setStreamingContent(""); abortRef.current = null; // Optimistically insert the completed message into cache to avoid flash (Pitfall 2) queryClient.setQueryData( ["chat", "messages", conversationId], (old: unknown) => old, // Keep existing data -- invalidation will refetch ); queryClient.invalidateQueries({ queryKey: ["chat", "messages", conversationId] }); queryClient.invalidateQueries({ queryKey: ["chat", "conversations"] }); }, onError: (error: string) => { setIsStreaming(false); abortRef.current = null; console.error("[useStreamingChat] Stream error:", error); }, }, abort.signal, ); }, [conversationId, queryClient, startTransition], ); const stop = useCallback(() => { abortRef.current?.abort(); abortRef.current = null; const partial = streamingContent; setIsStreaming(false); setStreamingContent(""); // Persist partial content with [stopped] suffix (Open Question 3) if (conversationId && partial.trim()) { chatApi.savePartialMessage(conversationId, { role: "assistant", content: partial.trim() + " [stopped]", }).then(() => { queryClient.invalidateQueries({ queryKey: ["chat", "messages", conversationId] }); }); } }, [conversationId, streamingContent, queryClient]); return { streamingContent, isStreaming, startStream, stop }; } ``` Key design decisions: - `startTransition` wraps `setStreamingContent` so token appends don't block user input (PERF-02) - `AbortController` for stop (CHAT-12) -- server detects `req.on("close")` - On stop, partial content saved with " [stopped]" suffix to DB - On done, cache invalidated (not optimistically set) to let React Query refetch the canonical data **3. Replace Wave 0 test stubs in `ui/src/hooks/useStreamingChat.test.ts` with real unit tests:** Replace the entire file. The hook's core logic (token accumulation, lifecycle, stop) can be tested by mocking `chatApi.postMessageAndStream` and `@tanstack/react-query`'s `useQueryClient`. Use `renderHook` from `@testing-library/react` (already installed) with a `QueryClientProvider` wrapper: ```typescript // @vitest-environment jsdom import { describe, it, expect, vi, beforeEach } from "vitest"; import { renderHook, act } from "@testing-library/react"; import { QueryClient, QueryClientProvider } from "@tanstack/react-query"; import { createElement } from "react"; import { useStreamingChat } from "./useStreamingChat"; import { chatApi } from "../api/chat"; // Mock chatApi vi.mock("../api/chat", () => ({ chatApi: { postMessageAndStream: vi.fn(), savePartialMessage: vi.fn().mockResolvedValue({}), postMessage: vi.fn().mockResolvedValue({}), }, })); function createWrapper() { const queryClient = new QueryClient({ defaultOptions: { queries: { retry: false } }, }); return ({ children }: { children: React.ReactNode }) => createElement(QueryClientProvider, { client: queryClient }, children); } describe("useStreamingChat", () => { beforeEach(() => { vi.clearAllMocks(); }); it("accumulates tokens from onToken callbacks into streamingContent", async () => { // Mock postMessageAndStream to capture callbacks and simulate tokens let capturedCallbacks: any; vi.mocked(chatApi.postMessageAndStream).mockImplementation( (_convId, _data, callbacks, _signal) => { capturedCallbacks = callbacks; return Promise.resolve(); }, ); const { result } = renderHook( () => useStreamingChat("conv-1"), { wrapper: createWrapper() }, ); // Start stream act(() => { result.current.startStream("hello world"); }); expect(result.current.isStreaming).toBe(true); expect(result.current.streamingContent).toBe(""); // Simulate tokens arriving act(() => { capturedCallbacks.onToken("Hello "); }); expect(result.current.streamingContent).toBe("Hello "); act(() => { capturedCallbacks.onToken("world!"); }); expect(result.current.streamingContent).toBe("Hello world!"); }); it("sets isStreaming=true when stream starts, false on done", async () => { let capturedCallbacks: any; vi.mocked(chatApi.postMessageAndStream).mockImplementation( (_convId, _data, callbacks, _signal) => { capturedCallbacks = callbacks; return Promise.resolve(); }, ); const { result } = renderHook( () => useStreamingChat("conv-1"), { wrapper: createWrapper() }, ); expect(result.current.isStreaming).toBe(false); act(() => { result.current.startStream("test"); }); expect(result.current.isStreaming).toBe(true); act(() => { capturedCallbacks.onDone("msg-1", "test response"); }); expect(result.current.isStreaming).toBe(false); expect(result.current.streamingContent).toBe(""); }); it("stop() aborts the controller and sets isStreaming=false", async () => { let capturedSignal: AbortSignal | undefined; vi.mocked(chatApi.postMessageAndStream).mockImplementation( (_convId, _data, _callbacks, signal) => { capturedSignal = signal; return Promise.resolve(); }, ); const { result } = renderHook( () => useStreamingChat("conv-1"), { wrapper: createWrapper() }, ); act(() => { result.current.startStream("test"); }); expect(result.current.isStreaming).toBe(true); expect(capturedSignal?.aborted).toBe(false); act(() => { result.current.stop(); }); expect(result.current.isStreaming).toBe(false); expect(capturedSignal?.aborted).toBe(true); }); it("handles SSE error by setting isStreaming=false", async () => { let capturedCallbacks: any; vi.mocked(chatApi.postMessageAndStream).mockImplementation( (_convId, _data, callbacks, _signal) => { capturedCallbacks = callbacks; return Promise.resolve(); }, ); const { result } = renderHook( () => useStreamingChat("conv-1"), { wrapper: createWrapper() }, ); act(() => { result.current.startStream("test"); }); expect(result.current.isStreaming).toBe(true); act(() => { capturedCallbacks.onError("Stream error"); }); expect(result.current.isStreaming).toBe(false); }); it("does nothing when conversationId is null", () => { const { result } = renderHook( () => useStreamingChat(null), { wrapper: createWrapper() }, ); act(() => { result.current.startStream("test"); }); expect(chatApi.postMessageAndStream).not.toHaveBeenCalled(); expect(result.current.isStreaming).toBe(false); }); }); ``` IMPORTANT: This replaces ALL `it.todo()` stubs with real tests. After this task runs, `useStreamingChat.test.ts` must have zero `it.todo()` entries. cd /opt/nexus && pnpm --filter @paperclipai/ui vitest run src/hooks/useStreamingChat.test.ts --reporter=verbose 2>&1 | tail -30 - grep -q "useStreamingChat" ui/src/hooks/useStreamingChat.ts - grep -q "postMessageAndStream" ui/src/api/chat.ts - grep -q "startTransition" ui/src/hooks/useStreamingChat.ts - grep -q "AbortController" ui/src/hooks/useStreamingChat.ts - grep -q "\\[stopped\\]" ui/src/hooks/useStreamingChat.ts - grep -q "savePartialMessage" ui/src/api/chat.ts - grep -q "ReadableStream\\|getReader" ui/src/api/chat.ts - NOT grep -q "it.todo" ui/src/hooks/useStreamingChat.test.ts (all stubs replaced with real tests) - vitest reports 5 passing tests in useStreamingChat.test.ts - useStreamingChat hook exists with startStream, stop, streamingContent, isStreaming - chatApi.postMessageAndStream uses fetch ReadableStream for POST SSE - chatApi.savePartialMessage persists partial content on stop - startTransition used for token accumulation (PERF-02) - AbortController used for stop functionality (CHAT-12) - Partial message saved with " [stopped]" suffix on stop - Wave 0 test stubs REPLACED with 5 real unit tests covering: token accumulation, isStreaming lifecycle, stop() abort, error handling, null conversationId guard - All tests pass (no it.todo remaining) - UI TypeScript compiles clean - `pnpm --filter @paperclipai/ui exec -- tsc --noEmit` passes - `pnpm --filter @paperclipai/server exec -- tsc --noEmit` passes (or pre-existing non-chat errors only) - `pnpm --filter @paperclipai/ui vitest run src/hooks/useStreamingChat.test.ts` passes with 5 real tests (zero todos) - Server routes include stream, edit, truncate endpoints - Client hook manages full SSE lifecycle - PERF-02 verified: flushHeaders position precedes for-await in chat.ts - Tokens stream from server to client via SSE (CHAT-01) - Stop generation aborts the connection and saves partial content (CHAT-12) - SSE headers flushed before generation begins (PERF-02) - Edit and truncate server endpoints ready for Plan 03 UI - useStreamingChat has real passing unit tests (not stubs) After completion, create `.planning/phases/22-agent-streaming/22-01-SUMMARY.md`