nexus/.planning/phases/22-agent-streaming/22-01-PLAN.md

23 KiB

phase plan type wave depends_on files_modified autonomous requirements must_haves
22-agent-streaming 01 execute 1
22-00
server/src/services/chat.ts
server/src/routes/chat.ts
ui/src/hooks/useStreamingChat.ts
ui/src/hooks/useStreamingChat.test.ts
ui/src/api/chat.ts
true
CHAT-01
CHAT-12
PERF-02
truths artifacts key_links
Server SSE endpoint streams token events as text/event-stream
Client hook accumulates tokens into streamingContent string
User can stop a stream mid-generation and partial content is preserved
First SSE headers are flushed before any LLM generation begins
path provides contains
server/src/routes/chat.ts POST /conversations/:id/stream SSE endpoint text/event-stream
path provides contains
server/src/services/chat.ts editMessage and truncateMessagesAfter methods editMessage
path provides exports
ui/src/hooks/useStreamingChat.ts SSE lifecycle hook
useStreamingChat
path provides contains
ui/src/api/chat.ts postMessageAndStream method postMessageAndStream
from to via pattern
ui/src/hooks/useStreamingChat.ts server POST /conversations/:id/stream fetch with ReadableStream fetch.*stream
from to via pattern
server/src/routes/chat.ts server/src/services/chat.ts svc.addMessage for final commit svc.addMessage
SSE streaming endpoint on the server and `useStreamingChat` hook on the client. The server uses a stub echo stream (repeats user message as fake tokens) since real LLM integration is Phase 23. The client uses `fetch` with `ReadableStream` (not native `EventSource`) because the stream endpoint is POST-based.

Purpose: Enable real-time token-by-token message delivery (CHAT-01), stop generation (CHAT-12), and sub-100ms first-token latency (PERF-02). Output: Server SSE route, chat service edit/truncate methods, client streaming hook, updated chat API client.

<execution_context> @$HOME/.claude/get-shit-done/workflows/execute-plan.md @$HOME/.claude/get-shit-done/templates/summary.md </execution_context>

@.planning/PROJECT.md @.planning/ROADMAP.md @.planning/STATE.md @.planning/phases/22-agent-streaming/22-RESEARCH.md @.planning/phases/22-agent-streaming/22-00-SUMMARY.md From server/src/routes/chat.ts (existing): ```typescript export function chatRoutes(db: Db): Router { const router = Router(); const svc = chatService(db); // ... existing routes: POST /conversations, GET /conversations/:id, etc. } ```

From server/src/services/chat.ts (existing):

export function chatService(db: Db) {
  return {
    createConversation(...), listConversations(...), getConversation(...),
    updateConversation(...), softDeleteConversation(...),
    listMessages(...), addMessage(...)
  };
}

From server/src/routes/plugins.ts (SSE pattern):

res.writeHead(200, {
  "Content-Type": "text/event-stream",
  "Cache-Control": "no-cache",
  "Connection": "keep-alive",
  "X-Accel-Buffering": "no",
});
res.flushHeaders();
res.write(":ok\n\n");

From ui/src/api/chat.ts (existing):

export const chatApi = {
  listConversations(...), createConversation(...), getConversation(...),
  updateConversation(...), deleteConversation(...), listMessages(...), postMessage(...)
};

From ui/src/hooks/useChatMessages.ts:

export function useChatMessages(conversationId: string | null) {
  // queryKey: ["chat", "messages", conversationId]
  // sendMutation invalidates: ["chat", "messages", conversationId] and ["chat", "conversations"]
}
Task 1: Server SSE streaming endpoint + edit/truncate service methods - server/src/routes/chat.ts - server/src/services/chat.ts - server/src/routes/plugins.ts (lines 1140-1185 for SSE pattern) - packages/db/src/schema/chat_messages.ts server/src/services/chat.ts, server/src/routes/chat.ts **1. Add three new methods to `chatService` in `server/src/services/chat.ts`:**

a) editMessage(messageId: string, content: string) -- Updates a message's content and updatedAt:

async editMessage(messageId: string, content: string) {
  const [row] = await db
    .update(chatMessages)
    .set({ content, updatedAt: new Date() })
    .where(eq(chatMessages.id, messageId))
    .returning();
  return row;
},

b) truncateMessagesAfter(conversationId: string, messageId: string) -- Deletes all messages in the conversation created after the given message:

async truncateMessagesAfter(conversationId: string, messageId: string) {
  // Get the target message's createdAt
  const [target] = await db
    .select({ createdAt: chatMessages.createdAt })
    .from(chatMessages)
    .where(eq(chatMessages.id, messageId));
  if (!target) return;
  await db
    .delete(chatMessages)
    .where(
      and(
        eq(chatMessages.conversationId, conversationId),
        gt(chatMessages.createdAt, target.createdAt),
      ),
    );
},

Import gt from drizzle-orm alongside existing imports.

c) streamEcho(content: string, signal: AbortSignal) -- Async generator that yields fake tokens (stub for Phase 23 real LLM):

async *streamEcho(content: string, signal: AbortSignal) {
  const words = content.split(/\s+/);
  for (const word of words) {
    if (signal.aborted) break;
    await new Promise((r) => setTimeout(r, 50));
    yield word + " ";
  }
},

2. Add three new routes to chatRoutes in server/src/routes/chat.ts:

a) POST /conversations/:id/stream -- SSE streaming endpoint:

router.post("/conversations/:id/stream", async (req, res) => {
  assertBoard(req);
  const { content, agentId } = req.body;
  if (!content || typeof content !== "string") {
    res.status(400).json({ error: "content is required" });
    return;
  }

  // Set SSE headers and flush BEFORE any generation (PERF-02)
  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Cache-Control", "no-cache");
  res.setHeader("Connection", "keep-alive");
  res.setHeader("X-Accel-Buffering", "no");
  res.flushHeaders();
  res.write(":ok\n\n");

  const abort = new AbortController();
  req.on("close", () => abort.abort());

  try {
    let fullContent = "";
    for await (const token of svc.streamEcho(content, abort.signal)) {
      if (!res.writable) break;
      fullContent += token;
      res.write(`data: ${JSON.stringify({ token })}\n\n`);
    }
    if (res.writable && !abort.signal.aborted) {
      const message = await svc.addMessage(req.params.id!, {
        role: "assistant",
        content: fullContent.trim(),
        agentId: agentId || undefined,
      });
      res.write(`data: ${JSON.stringify({ done: true, messageId: message.id, content: fullContent.trim() })}\n\n`);
    }
  } catch (err) {
    if (res.writable && !abort.signal.aborted) {
      res.write(`data: ${JSON.stringify({ error: "Stream error" })}\n\n`);
    }
  } finally {
    res.end();
  }
});

CRITICAL: res.flushHeaders() MUST be called before the for-await loop. Check res.writable before every res.write() (same guard pattern as plugins.ts).

b) PATCH /conversations/:id/messages/:msgId -- Edit message content:

router.patch("/conversations/:id/messages/:msgId", async (req, res) => {
  assertBoard(req);
  const { content } = req.body;
  if (!content || typeof content !== "string") {
    res.status(400).json({ error: "content is required" });
    return;
  }
  const message = await svc.editMessage(req.params.msgId!, content);
  if (!message) {
    res.status(404).json({ error: "Message not found" });
    return;
  }
  res.json(message);
});

c) DELETE /conversations/:id/messages/after/:msgId -- Truncate messages after a given message:

router.delete("/conversations/:id/messages/after/:msgId", async (req, res) => {
  assertBoard(req);
  await svc.truncateMessagesAfter(req.params.id!, req.params.msgId!);
  res.status(204).end();
});
cd /opt/nexus && pnpm --filter @paperclipai/server exec -- tsc --noEmit 2>&1 | head -20 && echo "--- PERF-02 flushHeaders-before-loop check ---" && python3 -c " import re code = open('server/src/routes/chat.ts').read() flush_pos = code.find('flushHeaders') loop_pos = code.find('for await') assert flush_pos != -1, 'flushHeaders not found' assert loop_pos != -1, 'for await not found' assert flush_pos < loop_pos, f'PERF-02 FAIL: flushHeaders ({flush_pos}) must precede for-await ({loop_pos})' print('PERF-02 OK: flushHeaders precedes for-await loop') " - grep -q "text/event-stream" server/src/routes/chat.ts - grep -q "flushHeaders" server/src/routes/chat.ts - grep -q "editMessage" server/src/services/chat.ts - grep -q "truncateMessagesAfter" server/src/services/chat.ts - grep -q "streamEcho" server/src/services/chat.ts - grep -q "res.writable" server/src/routes/chat.ts - grep -q "/conversations/:id/stream" server/src/routes/chat.ts - grep -q "/conversations/:id/messages/:msgId" server/src/routes/chat.ts - grep -q "/conversations/:id/messages/after/:msgId" server/src/routes/chat.ts - flushHeaders() position in file must precede for-await loop position (PERF-02) - POST /conversations/:id/stream SSE endpoint exists with proper headers flushed before generation - PATCH /conversations/:id/messages/:msgId edits message content - DELETE /conversations/:id/messages/after/:msgId truncates subsequent messages - chatService has editMessage, truncateMessagesAfter, and streamEcho methods - All routes check res.writable before writing (prevents write-after-end) - PERF-02 verified: flushHeaders precedes the for-await generation loop - Server TypeScript compiles without errors in chat files Task 2: useStreamingChat hook, chat API stream method, and real unit tests - ui/src/hooks/useChatMessages.ts - ui/src/api/chat.ts - ui/src/plugins/bridge.ts - ui/src/hooks/useStreamingChat.test.ts ui/src/hooks/useStreamingChat.ts, ui/src/hooks/useStreamingChat.test.ts, ui/src/api/chat.ts **1. Add stream-related methods to `chatApi` in `ui/src/api/chat.ts`:**
async postMessageAndStream(
  conversationId: string,
  data: { content: string; agentId?: string },
  callbacks: {
    onToken: (token: string) => void;
    onDone: (messageId: string, content: string) => void;
    onError: (error: string) => void;
  },
  signal?: AbortSignal,
) {
  const res = await fetch(`/api/conversations/${conversationId}/stream`, {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify(data),
    credentials: "include",
    signal,
  });
  if (!res.ok || !res.body) {
    callbacks.onError("Failed to start stream");
    return;
  }
  const reader = res.body.getReader();
  const decoder = new TextDecoder();
  let buffer = "";
  try {
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      buffer += decoder.decode(value, { stream: true });
      const lines = buffer.split("\n");
      buffer = lines.pop() ?? "";
      for (const line of lines) {
        if (!line.startsWith("data: ")) continue;
        const json = line.slice(6);
        try {
          const parsed = JSON.parse(json) as { token?: string; done?: boolean; messageId?: string; content?: string; error?: string };
          if (parsed.token) callbacks.onToken(parsed.token);
          if (parsed.done && parsed.messageId) callbacks.onDone(parsed.messageId, parsed.content ?? "");
          if (parsed.error) callbacks.onError(parsed.error);
        } catch { /* ignore malformed lines */ }
      }
    }
  } catch (err) {
    if (signal?.aborted) return; // Expected on stop
    callbacks.onError("Stream connection lost");
  }
},

async savePartialMessage(conversationId: string, data: { role: "assistant"; content: string; agentId?: string }) {
  return chatApi.postMessage(conversationId, data);
},

Use fetch with ReadableStream instead of EventSource because the endpoint is POST-based. EventSource only supports GET (Open Question 2 from RESEARCH.md).

2. Create ui/src/hooks/useStreamingChat.ts:

import { useRef, useState, useTransition, useCallback } from "react";
import { useQueryClient } from "@tanstack/react-query";
import { chatApi } from "../api/chat";

export function useStreamingChat(conversationId: string | null) {
  const [streamingContent, setStreamingContent] = useState<string>("");
  const [isStreaming, setIsStreaming] = useState(false);
  const abortRef = useRef<AbortController | null>(null);
  const queryClient = useQueryClient();
  const [, startTransition] = useTransition();

  const startStream = useCallback(
    (userMessage: string, agentId?: string) => {
      if (!conversationId) return;

      setIsStreaming(true);
      setStreamingContent("");

      const abort = new AbortController();
      abortRef.current = abort;

      chatApi.postMessageAndStream(
        conversationId,
        { content: userMessage, agentId },
        {
          onToken: (token: string) => {
            startTransition(() => {
              setStreamingContent((prev) => prev + token);
            });
          },
          onDone: (messageId: string, content: string) => {
            setIsStreaming(false);
            setStreamingContent("");
            abortRef.current = null;
            // Optimistically insert the completed message into cache to avoid flash (Pitfall 2)
            queryClient.setQueryData(
              ["chat", "messages", conversationId],
              (old: unknown) => old, // Keep existing data -- invalidation will refetch
            );
            queryClient.invalidateQueries({ queryKey: ["chat", "messages", conversationId] });
            queryClient.invalidateQueries({ queryKey: ["chat", "conversations"] });
          },
          onError: (error: string) => {
            setIsStreaming(false);
            abortRef.current = null;
            console.error("[useStreamingChat] Stream error:", error);
          },
        },
        abort.signal,
      );
    },
    [conversationId, queryClient, startTransition],
  );

  const stop = useCallback(() => {
    abortRef.current?.abort();
    abortRef.current = null;
    const partial = streamingContent;
    setIsStreaming(false);
    setStreamingContent("");
    // Persist partial content with [stopped] suffix (Open Question 3)
    if (conversationId && partial.trim()) {
      chatApi.savePartialMessage(conversationId, {
        role: "assistant",
        content: partial.trim() + " [stopped]",
      }).then(() => {
        queryClient.invalidateQueries({ queryKey: ["chat", "messages", conversationId] });
      });
    }
  }, [conversationId, streamingContent, queryClient]);

  return { streamingContent, isStreaming, startStream, stop };
}

Key design decisions:

  • startTransition wraps setStreamingContent so token appends don't block user input (PERF-02)
  • AbortController for stop (CHAT-12) -- server detects req.on("close")
  • On stop, partial content saved with " [stopped]" suffix to DB
  • On done, cache invalidated (not optimistically set) to let React Query refetch the canonical data

3. Replace Wave 0 test stubs in ui/src/hooks/useStreamingChat.test.ts with real unit tests:

Replace the entire file. The hook's core logic (token accumulation, lifecycle, stop) can be tested by mocking chatApi.postMessageAndStream and @tanstack/react-query's useQueryClient. Use renderHook from @testing-library/react (already installed) with a QueryClientProvider wrapper:

// @vitest-environment jsdom
import { describe, it, expect, vi, beforeEach } from "vitest";
import { renderHook, act } from "@testing-library/react";
import { QueryClient, QueryClientProvider } from "@tanstack/react-query";
import { createElement } from "react";
import { useStreamingChat } from "./useStreamingChat";
import { chatApi } from "../api/chat";

// Mock chatApi
vi.mock("../api/chat", () => ({
  chatApi: {
    postMessageAndStream: vi.fn(),
    savePartialMessage: vi.fn().mockResolvedValue({}),
    postMessage: vi.fn().mockResolvedValue({}),
  },
}));

function createWrapper() {
  const queryClient = new QueryClient({
    defaultOptions: { queries: { retry: false } },
  });
  return ({ children }: { children: React.ReactNode }) =>
    createElement(QueryClientProvider, { client: queryClient }, children);
}

describe("useStreamingChat", () => {
  beforeEach(() => {
    vi.clearAllMocks();
  });

  it("accumulates tokens from onToken callbacks into streamingContent", async () => {
    // Mock postMessageAndStream to capture callbacks and simulate tokens
    let capturedCallbacks: any;
    vi.mocked(chatApi.postMessageAndStream).mockImplementation(
      (_convId, _data, callbacks, _signal) => {
        capturedCallbacks = callbacks;
        return Promise.resolve();
      },
    );

    const { result } = renderHook(
      () => useStreamingChat("conv-1"),
      { wrapper: createWrapper() },
    );

    // Start stream
    act(() => {
      result.current.startStream("hello world");
    });

    expect(result.current.isStreaming).toBe(true);
    expect(result.current.streamingContent).toBe("");

    // Simulate tokens arriving
    act(() => {
      capturedCallbacks.onToken("Hello ");
    });
    expect(result.current.streamingContent).toBe("Hello ");

    act(() => {
      capturedCallbacks.onToken("world!");
    });
    expect(result.current.streamingContent).toBe("Hello world!");
  });

  it("sets isStreaming=true when stream starts, false on done", async () => {
    let capturedCallbacks: any;
    vi.mocked(chatApi.postMessageAndStream).mockImplementation(
      (_convId, _data, callbacks, _signal) => {
        capturedCallbacks = callbacks;
        return Promise.resolve();
      },
    );

    const { result } = renderHook(
      () => useStreamingChat("conv-1"),
      { wrapper: createWrapper() },
    );

    expect(result.current.isStreaming).toBe(false);

    act(() => {
      result.current.startStream("test");
    });
    expect(result.current.isStreaming).toBe(true);

    act(() => {
      capturedCallbacks.onDone("msg-1", "test response");
    });
    expect(result.current.isStreaming).toBe(false);
    expect(result.current.streamingContent).toBe("");
  });

  it("stop() aborts the controller and sets isStreaming=false", async () => {
    let capturedSignal: AbortSignal | undefined;
    vi.mocked(chatApi.postMessageAndStream).mockImplementation(
      (_convId, _data, _callbacks, signal) => {
        capturedSignal = signal;
        return Promise.resolve();
      },
    );

    const { result } = renderHook(
      () => useStreamingChat("conv-1"),
      { wrapper: createWrapper() },
    );

    act(() => {
      result.current.startStream("test");
    });
    expect(result.current.isStreaming).toBe(true);
    expect(capturedSignal?.aborted).toBe(false);

    act(() => {
      result.current.stop();
    });
    expect(result.current.isStreaming).toBe(false);
    expect(capturedSignal?.aborted).toBe(true);
  });

  it("handles SSE error by setting isStreaming=false", async () => {
    let capturedCallbacks: any;
    vi.mocked(chatApi.postMessageAndStream).mockImplementation(
      (_convId, _data, callbacks, _signal) => {
        capturedCallbacks = callbacks;
        return Promise.resolve();
      },
    );

    const { result } = renderHook(
      () => useStreamingChat("conv-1"),
      { wrapper: createWrapper() },
    );

    act(() => {
      result.current.startStream("test");
    });
    expect(result.current.isStreaming).toBe(true);

    act(() => {
      capturedCallbacks.onError("Stream error");
    });
    expect(result.current.isStreaming).toBe(false);
  });

  it("does nothing when conversationId is null", () => {
    const { result } = renderHook(
      () => useStreamingChat(null),
      { wrapper: createWrapper() },
    );

    act(() => {
      result.current.startStream("test");
    });
    expect(chatApi.postMessageAndStream).not.toHaveBeenCalled();
    expect(result.current.isStreaming).toBe(false);
  });
});

IMPORTANT: This replaces ALL it.todo() stubs with real tests. After this task runs, useStreamingChat.test.ts must have zero it.todo() entries. cd /opt/nexus && pnpm --filter @paperclipai/ui vitest run src/hooks/useStreamingChat.test.ts --reporter=verbose 2>&1 | tail -30 <acceptance_criteria> - grep -q "useStreamingChat" ui/src/hooks/useStreamingChat.ts - grep -q "postMessageAndStream" ui/src/api/chat.ts - grep -q "startTransition" ui/src/hooks/useStreamingChat.ts - grep -q "AbortController" ui/src/hooks/useStreamingChat.ts - grep -q "\[stopped\]" ui/src/hooks/useStreamingChat.ts - grep -q "savePartialMessage" ui/src/api/chat.ts - grep -q "ReadableStream\|getReader" ui/src/api/chat.ts - NOT grep -q "it.todo" ui/src/hooks/useStreamingChat.test.ts (all stubs replaced with real tests) - vitest reports 5 passing tests in useStreamingChat.test.ts </acceptance_criteria> - useStreamingChat hook exists with startStream, stop, streamingContent, isStreaming - chatApi.postMessageAndStream uses fetch ReadableStream for POST SSE - chatApi.savePartialMessage persists partial content on stop - startTransition used for token accumulation (PERF-02) - AbortController used for stop functionality (CHAT-12) - Partial message saved with " [stopped]" suffix on stop - Wave 0 test stubs REPLACED with 5 real unit tests covering: token accumulation, isStreaming lifecycle, stop() abort, error handling, null conversationId guard - All tests pass (no it.todo remaining) - UI TypeScript compiles clean

- `pnpm --filter @paperclipai/ui exec -- tsc --noEmit` passes - `pnpm --filter @paperclipai/server exec -- tsc --noEmit` passes (or pre-existing non-chat errors only) - `pnpm --filter @paperclipai/ui vitest run src/hooks/useStreamingChat.test.ts` passes with 5 real tests (zero todos) - Server routes include stream, edit, truncate endpoints - Client hook manages full SSE lifecycle - PERF-02 verified: flushHeaders position precedes for-await in chat.ts

<success_criteria>

  • Tokens stream from server to client via SSE (CHAT-01)
  • Stop generation aborts the connection and saves partial content (CHAT-12)
  • SSE headers flushed before generation begins (PERF-02)
  • Edit and truncate server endpoints ready for Plan 03 UI
  • useStreamingChat has real passing unit tests (not stubs) </success_criteria>
After completion, create `.planning/phases/22-agent-streaming/22-01-SUMMARY.md`