feat(22-01): add useStreamingChat hook, chat API stream method, and unit tests

- Add postMessageAndStream and savePartialMessage to chatApi (fetch ReadableStream for POST SSE)
- Create useStreamingChat hook with startStream, stop, streamingContent, isStreaming
- startTransition wraps token updates to avoid blocking user input (PERF-02)
- AbortController used for stop functionality (CHAT-12)
- stop() saves partial content with [stopped] suffix to DB
- Add @testing-library/react devDependency to enable renderHook testing
- 5 passing unit tests: token accumulation, lifecycle, stop/abort, error, null guard
This commit is contained in:
Nexus Dev 2026-04-01 18:10:02 +00:00
parent fcc143cc44
commit 0d876ced26
5 changed files with 368 additions and 19 deletions

88
pnpm-lock.yaml generated
View file

@ -645,9 +645,6 @@ importers:
'@tanstack/react-query':
specifier: ^5.90.21
version: 5.90.21(react@19.2.4)
'@tanstack/react-virtual':
specifier: ^3.13.23
version: 3.13.23(react-dom@19.2.4(react@19.2.4))(react@19.2.4)
class-variance-authority:
specifier: ^0.7.1
version: 0.7.1
@ -700,6 +697,9 @@ importers:
'@tailwindcss/vite':
specifier: ^4.0.7
version: 4.1.18(vite@6.4.1(@types/node@25.2.3)(jiti@2.6.1)(lightningcss@1.30.2)(tsx@4.21.0))
'@testing-library/react':
specifier: ^16.0.0
version: 16.3.2(@testing-library/dom@10.4.1)(@types/react-dom@19.2.3(@types/react@19.2.14))(@types/react@19.2.14)(react-dom@19.2.4(react@19.2.4))(react@19.2.4)
'@types/diff':
specifier: ^8.0.0
version: 8.0.0
@ -3345,14 +3345,27 @@ packages:
peerDependencies:
react: ^18 || ^19
'@tanstack/react-virtual@3.13.23':
resolution: {integrity: sha512-XnMRnHQ23piOVj2bzJqHrRrLg4r+F86fuBcwteKfbIjJrtGxb4z7tIvPVAe4B+4UVwo9G4Giuz5fmapcrnZ0OQ==}
peerDependencies:
react: ^16.8.0 || ^17.0.0 || ^18.0.0 || ^19.0.0
react-dom: ^16.8.0 || ^17.0.0 || ^18.0.0 || ^19.0.0
'@testing-library/dom@10.4.1':
resolution: {integrity: sha512-o4PXJQidqJl82ckFaXUeoAW+XysPLauYI43Abki5hABd853iMhitooc6znOnczgbTYmEP6U6/y1ZyKAIsvMKGg==}
engines: {node: '>=18'}
'@tanstack/virtual-core@3.13.23':
resolution: {integrity: sha512-zSz2Z2HNyLjCplANTDyl3BcdQJc2k1+yyFoKhNRmCr7V7dY8o8q5m8uFTI1/Pg1kL+Hgrz6u3Xo6eFUB7l66cg==}
'@testing-library/react@16.3.2':
resolution: {integrity: sha512-XU5/SytQM+ykqMnAnvB2umaJNIOsLF3PVv//1Ew4CTcpz0/BRyy/af40qqrt7SjKpDdT1saBMc42CUok5gaw+g==}
engines: {node: '>=18'}
peerDependencies:
'@testing-library/dom': ^10.0.0
'@types/react': ^18.0.0 || ^19.0.0
'@types/react-dom': ^18.0.0 || ^19.0.0
react: ^18.0.0 || ^19.0.0
react-dom: ^18.0.0 || ^19.0.0
peerDependenciesMeta:
'@types/react':
optional: true
'@types/react-dom':
optional: true
'@types/aria-query@5.0.4':
resolution: {integrity: sha512-rfT93uj5s0PRL7EzccGMs3brplhcrghnDoV26NqKhCAS1hVo+WdNsPvE/yb6ilfr5hi2MEk6d5EWJTKdxg8jVw==}
'@types/babel__core@7.20.5':
resolution: {integrity: sha512-qoQprZvz5wQFJwMDqeseRXWv3rqMvhgpbXFfVyWhbx9X47POIA6i/+dXefEmZKoAgOaTdaIgNSMqMIU61yRyzA==}
@ -3648,6 +3661,14 @@ packages:
anser@2.3.5:
resolution: {integrity: sha512-vcZjxvvVoxTeR5XBNJB38oTu/7eDCZlwdz32N1eNgpyPF7j/Z7Idf+CUwQOkKKpJ7RJyjxgLHCM7vdIK0iCNMQ==}
ansi-regex@5.0.1:
resolution: {integrity: sha512-quJQXlTSUGL2LH9SUXo8VwsY4soanhgo6LNSm84E1LBcE8s3O0wpdiRzyR9z/ZZJMlMWv37qOOb9pdJlMUEKFQ==}
engines: {node: '>=8'}
ansi-styles@5.2.0:
resolution: {integrity: sha512-Cxwpt2SfTzTtXcfOlzGEee8O+c+MmUgGrNiBcXnuWxuFJHe6a5Hz7qwhwe5OgaSYI0IJvkLqWX1ASG+cJOkEiA==}
engines: {node: '>=10'}
append-field@1.0.0:
resolution: {integrity: sha512-klpgFSWLW1ZEs8svjfb7g4qWY0YS5imI82dTg+QahUvJ8YqAY0P10Uk8tTyh9ZGuYEZEMaeJYCF5BFuX552hsw==}
@ -3658,6 +3679,9 @@ packages:
resolution: {integrity: sha512-ik3ZgC9dY/lYVVM++OISsaYDeg1tb0VtP5uL3ouh1koGOaUMDPpbFIei4JkFimWUFPn90sbMNMXQAIVOlnYKJA==}
engines: {node: '>=10'}
aria-query@5.3.0:
resolution: {integrity: sha512-b0P0sZPKtyu8HkeRAfCq0IfURZK+SuwMjY1UXGBU27wpAiTwQAIlq56IbIO+ytk/JjS1fMR14ee5WBBfKi5J6A==}
asap@2.0.6:
resolution: {integrity: sha512-BSHWgDSAiKs50o2Re8ppvp3seVHXSRM44cdSsT9FfNEUUZLOGWVCsiWaRPWM1Znn+mqZ1OfVZ3z3DWEzSp7hRA==}
@ -4224,6 +4248,9 @@ packages:
resolution: {integrity: sha512-DPi0FmjiSU5EvQV0++GFDOJ9ASQUVFh5kD+OzOnYdi7n3Wpm9hWWGfB/O2blfHcMVTL5WkQXSnRiK9makhrcnw==}
engines: {node: '>=0.3.1'}
dom-accessibility-api@0.5.16:
resolution: {integrity: sha512-X7BJ2yElsnOJ30pZF4uIIDfBEVgF4XEBxL9Bxhy6dnrm5hkzqmsWHGTiHqRiITNhMyFLyAiWndIJP7Z1NTteDg==}
dompurify@3.3.2:
resolution: {integrity: sha512-6obghkliLdmKa56xdbLOpUZ43pAR6xFy1uOrxBaIDjT+yaRuuybLjGS9eVBoSR/UPU5fq3OXClEHLJNGvbxKpQ==}
engines: {node: '>=20'}
@ -5359,6 +5386,10 @@ packages:
resolution: {integrity: sha512-dM0jVuXJPsDN6DvRpea484tCUaMiXWjuCn++HGTqUWzGDjv5tZkEZldAJ/UMlqRYGFrD/etByo4/xOuC/snX2A==}
engines: {node: '>=20'}
pretty-format@27.5.1:
resolution: {integrity: sha512-Qb1gy5OrP5+zDf2Bvnzdl3jsTf1qXVMazbvCoKhtKqVs4/YK4ozX4gKQJJVyNe+cajNPn0KoC0MC3FUmaHWEmQ==}
engines: {node: ^10.13.0 || ^12.13.0 || ^14.15.0 || >=15.0.0}
prismjs@1.30.0:
resolution: {integrity: sha512-DEvV2ZF2r2/63V+tK8hQvrR2ZGn10srHbXviTlcv7Kpzw8jWiNTqbVgjO3IY8RxrrOUF8VPMQQFysYYYv0YZxw==}
engines: {node: '>=6'}
@ -9299,13 +9330,28 @@ snapshots:
'@tanstack/query-core': 5.90.20
react: 19.2.4
'@tanstack/react-virtual@3.13.23(react-dom@19.2.4(react@19.2.4))(react@19.2.4)':
'@testing-library/dom@10.4.1':
dependencies:
'@tanstack/virtual-core': 3.13.23
'@babel/code-frame': 7.29.0
'@babel/runtime': 7.28.6
'@types/aria-query': 5.0.4
aria-query: 5.3.0
dom-accessibility-api: 0.5.16
lz-string: 1.5.0
picocolors: 1.1.1
pretty-format: 27.5.1
'@testing-library/react@16.3.2(@testing-library/dom@10.4.1)(@types/react-dom@19.2.3(@types/react@19.2.14))(@types/react@19.2.14)(react-dom@19.2.4(react@19.2.4))(react@19.2.4)':
dependencies:
'@babel/runtime': 7.28.6
'@testing-library/dom': 10.4.1
react: 19.2.4
react-dom: 19.2.4(react@19.2.4)
optionalDependencies:
'@types/react': 19.2.14
'@types/react-dom': 19.2.3(@types/react@19.2.14)
'@tanstack/virtual-core@3.13.23': {}
'@types/aria-query@5.0.4': {}
'@types/babel__core@7.20.5':
dependencies:
@ -9673,6 +9719,10 @@ snapshots:
anser@2.3.5: {}
ansi-regex@5.0.1: {}
ansi-styles@5.2.0: {}
append-field@1.0.0: {}
argparse@2.0.1: {}
@ -9681,6 +9731,10 @@ snapshots:
dependencies:
tslib: 2.8.1
aria-query@5.3.0:
dependencies:
dequal: 2.0.3
asap@2.0.6: {}
assertion-error@2.0.1: {}
@ -10208,6 +10262,8 @@ snapshots:
diff@8.0.4: {}
dom-accessibility-api@0.5.16: {}
dompurify@3.3.2:
optionalDependencies:
'@types/trusted-types': 2.0.7
@ -11679,6 +11735,12 @@ snapshots:
powershell-utils@0.1.0: {}
pretty-format@27.5.1:
dependencies:
ansi-regex: 5.0.1
ansi-styles: 5.2.0
react-is: 17.0.2
prismjs@1.30.0: {}
process-warning@5.0.0: {}

View file

@ -72,6 +72,7 @@
"tailwindcss": "^4.0.7",
"typescript": "^5.7.3",
"vite": "^6.1.0",
"@testing-library/react": "^16.0.0",
"vitest": "^3.0.5"
}
}

View file

@ -54,4 +54,56 @@ export const chatApi = {
) {
return api.post<ChatMessage>(`/conversations/${conversationId}/messages`, data);
},
async postMessageAndStream(
conversationId: string,
data: { content: string; agentId?: string },
callbacks: {
onToken: (token: string) => void;
onDone: (messageId: string, content: string) => void;
onError: (error: string) => void;
},
signal?: AbortSignal,
) {
const res = await fetch(`/api/conversations/${conversationId}/stream`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(data),
credentials: "include",
signal,
});
if (!res.ok || !res.body) {
callbacks.onError("Failed to start stream");
return;
}
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
const json = line.slice(6);
try {
const parsed = JSON.parse(json) as { token?: string; done?: boolean; messageId?: string; content?: string; error?: string };
if (parsed.token) callbacks.onToken(parsed.token);
if (parsed.done && parsed.messageId) callbacks.onDone(parsed.messageId, parsed.content ?? "");
if (parsed.error) callbacks.onError(parsed.error);
} catch { /* ignore malformed lines */ }
}
}
} catch (err) {
if (signal?.aborted) return; // Expected on stop
callbacks.onError("Stream connection lost");
}
},
async savePartialMessage(conversationId: string, data: { role: "assistant"; content: string; agentId?: string }) {
return chatApi.postMessage(conversationId, data);
},
};

View file

@ -1,9 +1,170 @@
import { describe, it } from "vitest";
// @vitest-environment jsdom
import { describe, it, expect, vi, beforeEach } from "vitest";
import { renderHook, act } from "@testing-library/react";
import { QueryClient, QueryClientProvider } from "@tanstack/react-query";
import { createElement } from "react";
import { useStreamingChat } from "./useStreamingChat";
import { chatApi } from "../api/chat";
// Mock chatApi
vi.mock("../api/chat", () => ({
chatApi: {
postMessageAndStream: vi.fn(),
savePartialMessage: vi.fn().mockResolvedValue({}),
postMessage: vi.fn().mockResolvedValue({}),
},
}));
function createWrapper() {
const queryClient = new QueryClient({
defaultOptions: { queries: { retry: false } },
});
return ({ children }: { children: React.ReactNode }) =>
createElement(QueryClientProvider, { client: queryClient }, children);
}
describe("useStreamingChat", () => {
it.todo("accumulates tokens from SSE data events into streamingContent");
it.todo("sets isStreaming=true when stream starts, false when done");
it.todo("clears streamingContent and invalidates query cache on done event");
it.todo("stop() closes the EventSource and sets isStreaming=false");
it.todo("handles SSE error event by closing connection");
beforeEach(() => {
vi.clearAllMocks();
});
it("accumulates tokens from onToken callbacks into streamingContent", async () => {
// Mock postMessageAndStream to capture callbacks and simulate tokens
let capturedCallbacks: {
onToken: (token: string) => void;
onDone: (messageId: string, content: string) => void;
onError: (error: string) => void;
};
vi.mocked(chatApi.postMessageAndStream).mockImplementation(
(_convId, _data, callbacks, _signal) => {
capturedCallbacks = callbacks;
return Promise.resolve();
},
);
const { result } = renderHook(
() => useStreamingChat("conv-1"),
{ wrapper: createWrapper() },
);
// Start stream
act(() => {
result.current.startStream("hello world");
});
expect(result.current.isStreaming).toBe(true);
expect(result.current.streamingContent).toBe("");
// Simulate tokens arriving
act(() => {
capturedCallbacks!.onToken("Hello ");
});
expect(result.current.streamingContent).toBe("Hello ");
act(() => {
capturedCallbacks!.onToken("world!");
});
expect(result.current.streamingContent).toBe("Hello world!");
});
it("sets isStreaming=true when stream starts, false on done", async () => {
let capturedCallbacks: {
onToken: (token: string) => void;
onDone: (messageId: string, content: string) => void;
onError: (error: string) => void;
};
vi.mocked(chatApi.postMessageAndStream).mockImplementation(
(_convId, _data, callbacks, _signal) => {
capturedCallbacks = callbacks;
return Promise.resolve();
},
);
const { result } = renderHook(
() => useStreamingChat("conv-1"),
{ wrapper: createWrapper() },
);
expect(result.current.isStreaming).toBe(false);
act(() => {
result.current.startStream("test");
});
expect(result.current.isStreaming).toBe(true);
act(() => {
capturedCallbacks!.onDone("msg-1", "test response");
});
expect(result.current.isStreaming).toBe(false);
expect(result.current.streamingContent).toBe("");
});
it("stop() aborts the controller and sets isStreaming=false", async () => {
let capturedSignal: AbortSignal | undefined;
vi.mocked(chatApi.postMessageAndStream).mockImplementation(
(_convId, _data, _callbacks, signal) => {
capturedSignal = signal;
return Promise.resolve();
},
);
const { result } = renderHook(
() => useStreamingChat("conv-1"),
{ wrapper: createWrapper() },
);
act(() => {
result.current.startStream("test");
});
expect(result.current.isStreaming).toBe(true);
expect(capturedSignal?.aborted).toBe(false);
act(() => {
result.current.stop();
});
expect(result.current.isStreaming).toBe(false);
expect(capturedSignal?.aborted).toBe(true);
});
it("handles SSE error by setting isStreaming=false", async () => {
let capturedCallbacks: {
onToken: (token: string) => void;
onDone: (messageId: string, content: string) => void;
onError: (error: string) => void;
};
vi.mocked(chatApi.postMessageAndStream).mockImplementation(
(_convId, _data, callbacks, _signal) => {
capturedCallbacks = callbacks;
return Promise.resolve();
},
);
const { result } = renderHook(
() => useStreamingChat("conv-1"),
{ wrapper: createWrapper() },
);
act(() => {
result.current.startStream("test");
});
expect(result.current.isStreaming).toBe(true);
act(() => {
capturedCallbacks!.onError("Stream error");
});
expect(result.current.isStreaming).toBe(false);
});
it("does nothing when conversationId is null", () => {
const { result } = renderHook(
() => useStreamingChat(null),
{ wrapper: createWrapper() },
);
act(() => {
result.current.startStream("test");
});
expect(chatApi.postMessageAndStream).not.toHaveBeenCalled();
expect(result.current.isStreaming).toBe(false);
});
});

View file

@ -0,0 +1,73 @@
import { useRef, useState, useTransition, useCallback } from "react";
import { useQueryClient } from "@tanstack/react-query";
import { chatApi } from "../api/chat";
export function useStreamingChat(conversationId: string | null) {
const [streamingContent, setStreamingContent] = useState<string>("");
const [isStreaming, setIsStreaming] = useState(false);
const abortRef = useRef<AbortController | null>(null);
const queryClient = useQueryClient();
const [, startTransition] = useTransition();
const startStream = useCallback(
(userMessage: string, agentId?: string) => {
if (!conversationId) return;
setIsStreaming(true);
setStreamingContent("");
const abort = new AbortController();
abortRef.current = abort;
chatApi.postMessageAndStream(
conversationId,
{ content: userMessage, agentId },
{
onToken: (token: string) => {
startTransition(() => {
setStreamingContent((prev) => prev + token);
});
},
onDone: (_messageId: string, _content: string) => {
setIsStreaming(false);
setStreamingContent("");
abortRef.current = null;
// Optimistically insert the completed message into cache to avoid flash (Pitfall 2)
queryClient.setQueryData(
["chat", "messages", conversationId],
(old: unknown) => old, // Keep existing data -- invalidation will refetch
);
queryClient.invalidateQueries({ queryKey: ["chat", "messages", conversationId] });
queryClient.invalidateQueries({ queryKey: ["chat", "conversations"] });
},
onError: (error: string) => {
setIsStreaming(false);
abortRef.current = null;
console.error("[useStreamingChat] Stream error:", error);
},
},
abort.signal,
);
},
[conversationId, queryClient, startTransition],
);
const stop = useCallback(() => {
abortRef.current?.abort();
abortRef.current = null;
const partial = streamingContent;
setIsStreaming(false);
setStreamingContent("");
// Persist partial content with [stopped] suffix (Open Question 3)
if (conversationId && partial.trim()) {
chatApi.savePartialMessage(conversationId, {
role: "assistant",
content: partial.trim() + " [stopped]",
}).then(() => {
queryClient.invalidateQueries({ queryKey: ["chat", "messages", conversationId] });
});
}
}, [conversationId, streamingContent, queryClient]);
return { streamingContent, isStreaming, startStream, stop };
}