diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 1d196cdd..cf5d7035 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -645,9 +645,6 @@ importers: '@tanstack/react-query': specifier: ^5.90.21 version: 5.90.21(react@19.2.4) - '@tanstack/react-virtual': - specifier: ^3.13.23 - version: 3.13.23(react-dom@19.2.4(react@19.2.4))(react@19.2.4) class-variance-authority: specifier: ^0.7.1 version: 0.7.1 @@ -700,6 +697,9 @@ importers: '@tailwindcss/vite': specifier: ^4.0.7 version: 4.1.18(vite@6.4.1(@types/node@25.2.3)(jiti@2.6.1)(lightningcss@1.30.2)(tsx@4.21.0)) + '@testing-library/react': + specifier: ^16.0.0 + version: 16.3.2(@testing-library/dom@10.4.1)(@types/react-dom@19.2.3(@types/react@19.2.14))(@types/react@19.2.14)(react-dom@19.2.4(react@19.2.4))(react@19.2.4) '@types/diff': specifier: ^8.0.0 version: 8.0.0 @@ -3345,14 +3345,27 @@ packages: peerDependencies: react: ^18 || ^19 - '@tanstack/react-virtual@3.13.23': - resolution: {integrity: sha512-XnMRnHQ23piOVj2bzJqHrRrLg4r+F86fuBcwteKfbIjJrtGxb4z7tIvPVAe4B+4UVwo9G4Giuz5fmapcrnZ0OQ==} - peerDependencies: - react: ^16.8.0 || ^17.0.0 || ^18.0.0 || ^19.0.0 - react-dom: ^16.8.0 || ^17.0.0 || ^18.0.0 || ^19.0.0 + '@testing-library/dom@10.4.1': + resolution: {integrity: sha512-o4PXJQidqJl82ckFaXUeoAW+XysPLauYI43Abki5hABd853iMhitooc6znOnczgbTYmEP6U6/y1ZyKAIsvMKGg==} + engines: {node: '>=18'} - '@tanstack/virtual-core@3.13.23': - resolution: {integrity: sha512-zSz2Z2HNyLjCplANTDyl3BcdQJc2k1+yyFoKhNRmCr7V7dY8o8q5m8uFTI1/Pg1kL+Hgrz6u3Xo6eFUB7l66cg==} + '@testing-library/react@16.3.2': + resolution: {integrity: sha512-XU5/SytQM+ykqMnAnvB2umaJNIOsLF3PVv//1Ew4CTcpz0/BRyy/af40qqrt7SjKpDdT1saBMc42CUok5gaw+g==} + engines: {node: '>=18'} + peerDependencies: + '@testing-library/dom': ^10.0.0 + '@types/react': ^18.0.0 || ^19.0.0 + '@types/react-dom': ^18.0.0 || ^19.0.0 + react: ^18.0.0 || ^19.0.0 + react-dom: ^18.0.0 || ^19.0.0 + peerDependenciesMeta: + '@types/react': + optional: true + '@types/react-dom': + optional: true + + '@types/aria-query@5.0.4': + resolution: {integrity: sha512-rfT93uj5s0PRL7EzccGMs3brplhcrghnDoV26NqKhCAS1hVo+WdNsPvE/yb6ilfr5hi2MEk6d5EWJTKdxg8jVw==} '@types/babel__core@7.20.5': resolution: {integrity: sha512-qoQprZvz5wQFJwMDqeseRXWv3rqMvhgpbXFfVyWhbx9X47POIA6i/+dXefEmZKoAgOaTdaIgNSMqMIU61yRyzA==} @@ -3648,6 +3661,14 @@ packages: anser@2.3.5: resolution: {integrity: sha512-vcZjxvvVoxTeR5XBNJB38oTu/7eDCZlwdz32N1eNgpyPF7j/Z7Idf+CUwQOkKKpJ7RJyjxgLHCM7vdIK0iCNMQ==} + ansi-regex@5.0.1: + resolution: {integrity: sha512-quJQXlTSUGL2LH9SUXo8VwsY4soanhgo6LNSm84E1LBcE8s3O0wpdiRzyR9z/ZZJMlMWv37qOOb9pdJlMUEKFQ==} + engines: {node: '>=8'} + + ansi-styles@5.2.0: + resolution: {integrity: sha512-Cxwpt2SfTzTtXcfOlzGEee8O+c+MmUgGrNiBcXnuWxuFJHe6a5Hz7qwhwe5OgaSYI0IJvkLqWX1ASG+cJOkEiA==} + engines: {node: '>=10'} + append-field@1.0.0: resolution: {integrity: sha512-klpgFSWLW1ZEs8svjfb7g4qWY0YS5imI82dTg+QahUvJ8YqAY0P10Uk8tTyh9ZGuYEZEMaeJYCF5BFuX552hsw==} @@ -3658,6 +3679,9 @@ packages: resolution: {integrity: sha512-ik3ZgC9dY/lYVVM++OISsaYDeg1tb0VtP5uL3ouh1koGOaUMDPpbFIei4JkFimWUFPn90sbMNMXQAIVOlnYKJA==} engines: {node: '>=10'} + aria-query@5.3.0: + resolution: {integrity: sha512-b0P0sZPKtyu8HkeRAfCq0IfURZK+SuwMjY1UXGBU27wpAiTwQAIlq56IbIO+ytk/JjS1fMR14ee5WBBfKi5J6A==} + asap@2.0.6: resolution: {integrity: sha512-BSHWgDSAiKs50o2Re8ppvp3seVHXSRM44cdSsT9FfNEUUZLOGWVCsiWaRPWM1Znn+mqZ1OfVZ3z3DWEzSp7hRA==} @@ -4224,6 +4248,9 @@ packages: resolution: {integrity: sha512-DPi0FmjiSU5EvQV0++GFDOJ9ASQUVFh5kD+OzOnYdi7n3Wpm9hWWGfB/O2blfHcMVTL5WkQXSnRiK9makhrcnw==} engines: {node: '>=0.3.1'} + dom-accessibility-api@0.5.16: + resolution: {integrity: sha512-X7BJ2yElsnOJ30pZF4uIIDfBEVgF4XEBxL9Bxhy6dnrm5hkzqmsWHGTiHqRiITNhMyFLyAiWndIJP7Z1NTteDg==} + dompurify@3.3.2: resolution: {integrity: sha512-6obghkliLdmKa56xdbLOpUZ43pAR6xFy1uOrxBaIDjT+yaRuuybLjGS9eVBoSR/UPU5fq3OXClEHLJNGvbxKpQ==} engines: {node: '>=20'} @@ -5359,6 +5386,10 @@ packages: resolution: {integrity: sha512-dM0jVuXJPsDN6DvRpea484tCUaMiXWjuCn++HGTqUWzGDjv5tZkEZldAJ/UMlqRYGFrD/etByo4/xOuC/snX2A==} engines: {node: '>=20'} + pretty-format@27.5.1: + resolution: {integrity: sha512-Qb1gy5OrP5+zDf2Bvnzdl3jsTf1qXVMazbvCoKhtKqVs4/YK4ozX4gKQJJVyNe+cajNPn0KoC0MC3FUmaHWEmQ==} + engines: {node: ^10.13.0 || ^12.13.0 || ^14.15.0 || >=15.0.0} + prismjs@1.30.0: resolution: {integrity: sha512-DEvV2ZF2r2/63V+tK8hQvrR2ZGn10srHbXviTlcv7Kpzw8jWiNTqbVgjO3IY8RxrrOUF8VPMQQFysYYYv0YZxw==} engines: {node: '>=6'} @@ -9299,13 +9330,28 @@ snapshots: '@tanstack/query-core': 5.90.20 react: 19.2.4 - '@tanstack/react-virtual@3.13.23(react-dom@19.2.4(react@19.2.4))(react@19.2.4)': + '@testing-library/dom@10.4.1': dependencies: - '@tanstack/virtual-core': 3.13.23 + '@babel/code-frame': 7.29.0 + '@babel/runtime': 7.28.6 + '@types/aria-query': 5.0.4 + aria-query: 5.3.0 + dom-accessibility-api: 0.5.16 + lz-string: 1.5.0 + picocolors: 1.1.1 + pretty-format: 27.5.1 + + '@testing-library/react@16.3.2(@testing-library/dom@10.4.1)(@types/react-dom@19.2.3(@types/react@19.2.14))(@types/react@19.2.14)(react-dom@19.2.4(react@19.2.4))(react@19.2.4)': + dependencies: + '@babel/runtime': 7.28.6 + '@testing-library/dom': 10.4.1 react: 19.2.4 react-dom: 19.2.4(react@19.2.4) + optionalDependencies: + '@types/react': 19.2.14 + '@types/react-dom': 19.2.3(@types/react@19.2.14) - '@tanstack/virtual-core@3.13.23': {} + '@types/aria-query@5.0.4': {} '@types/babel__core@7.20.5': dependencies: @@ -9673,6 +9719,10 @@ snapshots: anser@2.3.5: {} + ansi-regex@5.0.1: {} + + ansi-styles@5.2.0: {} + append-field@1.0.0: {} argparse@2.0.1: {} @@ -9681,6 +9731,10 @@ snapshots: dependencies: tslib: 2.8.1 + aria-query@5.3.0: + dependencies: + dequal: 2.0.3 + asap@2.0.6: {} assertion-error@2.0.1: {} @@ -10208,6 +10262,8 @@ snapshots: diff@8.0.4: {} + dom-accessibility-api@0.5.16: {} + dompurify@3.3.2: optionalDependencies: '@types/trusted-types': 2.0.7 @@ -11679,6 +11735,12 @@ snapshots: powershell-utils@0.1.0: {} + pretty-format@27.5.1: + dependencies: + ansi-regex: 5.0.1 + ansi-styles: 5.2.0 + react-is: 17.0.2 + prismjs@1.30.0: {} process-warning@5.0.0: {} diff --git a/ui/package.json b/ui/package.json index 91a4ded3..575b40ce 100644 --- a/ui/package.json +++ b/ui/package.json @@ -72,6 +72,7 @@ "tailwindcss": "^4.0.7", "typescript": "^5.7.3", "vite": "^6.1.0", + "@testing-library/react": "^16.0.0", "vitest": "^3.0.5" } } diff --git a/ui/src/api/chat.ts b/ui/src/api/chat.ts index 35377024..c8f0c5fe 100644 --- a/ui/src/api/chat.ts +++ b/ui/src/api/chat.ts @@ -54,4 +54,56 @@ export const chatApi = { ) { return api.post(`/conversations/${conversationId}/messages`, data); }, + + async postMessageAndStream( + conversationId: string, + data: { content: string; agentId?: string }, + callbacks: { + onToken: (token: string) => void; + onDone: (messageId: string, content: string) => void; + onError: (error: string) => void; + }, + signal?: AbortSignal, + ) { + const res = await fetch(`/api/conversations/${conversationId}/stream`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(data), + credentials: "include", + signal, + }); + if (!res.ok || !res.body) { + callbacks.onError("Failed to start stream"); + return; + } + const reader = res.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split("\n"); + buffer = lines.pop() ?? ""; + for (const line of lines) { + if (!line.startsWith("data: ")) continue; + const json = line.slice(6); + try { + const parsed = JSON.parse(json) as { token?: string; done?: boolean; messageId?: string; content?: string; error?: string }; + if (parsed.token) callbacks.onToken(parsed.token); + if (parsed.done && parsed.messageId) callbacks.onDone(parsed.messageId, parsed.content ?? ""); + if (parsed.error) callbacks.onError(parsed.error); + } catch { /* ignore malformed lines */ } + } + } + } catch (err) { + if (signal?.aborted) return; // Expected on stop + callbacks.onError("Stream connection lost"); + } + }, + + async savePartialMessage(conversationId: string, data: { role: "assistant"; content: string; agentId?: string }) { + return chatApi.postMessage(conversationId, data); + }, }; diff --git a/ui/src/hooks/useStreamingChat.test.ts b/ui/src/hooks/useStreamingChat.test.ts index 26f770b5..15178db1 100644 --- a/ui/src/hooks/useStreamingChat.test.ts +++ b/ui/src/hooks/useStreamingChat.test.ts @@ -1,9 +1,170 @@ -import { describe, it } from "vitest"; +// @vitest-environment jsdom +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { renderHook, act } from "@testing-library/react"; +import { QueryClient, QueryClientProvider } from "@tanstack/react-query"; +import { createElement } from "react"; +import { useStreamingChat } from "./useStreamingChat"; +import { chatApi } from "../api/chat"; + +// Mock chatApi +vi.mock("../api/chat", () => ({ + chatApi: { + postMessageAndStream: vi.fn(), + savePartialMessage: vi.fn().mockResolvedValue({}), + postMessage: vi.fn().mockResolvedValue({}), + }, +})); + +function createWrapper() { + const queryClient = new QueryClient({ + defaultOptions: { queries: { retry: false } }, + }); + return ({ children }: { children: React.ReactNode }) => + createElement(QueryClientProvider, { client: queryClient }, children); +} describe("useStreamingChat", () => { - it.todo("accumulates tokens from SSE data events into streamingContent"); - it.todo("sets isStreaming=true when stream starts, false when done"); - it.todo("clears streamingContent and invalidates query cache on done event"); - it.todo("stop() closes the EventSource and sets isStreaming=false"); - it.todo("handles SSE error event by closing connection"); + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("accumulates tokens from onToken callbacks into streamingContent", async () => { + // Mock postMessageAndStream to capture callbacks and simulate tokens + let capturedCallbacks: { + onToken: (token: string) => void; + onDone: (messageId: string, content: string) => void; + onError: (error: string) => void; + }; + vi.mocked(chatApi.postMessageAndStream).mockImplementation( + (_convId, _data, callbacks, _signal) => { + capturedCallbacks = callbacks; + return Promise.resolve(); + }, + ); + + const { result } = renderHook( + () => useStreamingChat("conv-1"), + { wrapper: createWrapper() }, + ); + + // Start stream + act(() => { + result.current.startStream("hello world"); + }); + + expect(result.current.isStreaming).toBe(true); + expect(result.current.streamingContent).toBe(""); + + // Simulate tokens arriving + act(() => { + capturedCallbacks!.onToken("Hello "); + }); + expect(result.current.streamingContent).toBe("Hello "); + + act(() => { + capturedCallbacks!.onToken("world!"); + }); + expect(result.current.streamingContent).toBe("Hello world!"); + }); + + it("sets isStreaming=true when stream starts, false on done", async () => { + let capturedCallbacks: { + onToken: (token: string) => void; + onDone: (messageId: string, content: string) => void; + onError: (error: string) => void; + }; + vi.mocked(chatApi.postMessageAndStream).mockImplementation( + (_convId, _data, callbacks, _signal) => { + capturedCallbacks = callbacks; + return Promise.resolve(); + }, + ); + + const { result } = renderHook( + () => useStreamingChat("conv-1"), + { wrapper: createWrapper() }, + ); + + expect(result.current.isStreaming).toBe(false); + + act(() => { + result.current.startStream("test"); + }); + expect(result.current.isStreaming).toBe(true); + + act(() => { + capturedCallbacks!.onDone("msg-1", "test response"); + }); + expect(result.current.isStreaming).toBe(false); + expect(result.current.streamingContent).toBe(""); + }); + + it("stop() aborts the controller and sets isStreaming=false", async () => { + let capturedSignal: AbortSignal | undefined; + vi.mocked(chatApi.postMessageAndStream).mockImplementation( + (_convId, _data, _callbacks, signal) => { + capturedSignal = signal; + return Promise.resolve(); + }, + ); + + const { result } = renderHook( + () => useStreamingChat("conv-1"), + { wrapper: createWrapper() }, + ); + + act(() => { + result.current.startStream("test"); + }); + expect(result.current.isStreaming).toBe(true); + expect(capturedSignal?.aborted).toBe(false); + + act(() => { + result.current.stop(); + }); + expect(result.current.isStreaming).toBe(false); + expect(capturedSignal?.aborted).toBe(true); + }); + + it("handles SSE error by setting isStreaming=false", async () => { + let capturedCallbacks: { + onToken: (token: string) => void; + onDone: (messageId: string, content: string) => void; + onError: (error: string) => void; + }; + vi.mocked(chatApi.postMessageAndStream).mockImplementation( + (_convId, _data, callbacks, _signal) => { + capturedCallbacks = callbacks; + return Promise.resolve(); + }, + ); + + const { result } = renderHook( + () => useStreamingChat("conv-1"), + { wrapper: createWrapper() }, + ); + + act(() => { + result.current.startStream("test"); + }); + expect(result.current.isStreaming).toBe(true); + + act(() => { + capturedCallbacks!.onError("Stream error"); + }); + expect(result.current.isStreaming).toBe(false); + }); + + it("does nothing when conversationId is null", () => { + const { result } = renderHook( + () => useStreamingChat(null), + { wrapper: createWrapper() }, + ); + + act(() => { + result.current.startStream("test"); + }); + expect(chatApi.postMessageAndStream).not.toHaveBeenCalled(); + expect(result.current.isStreaming).toBe(false); + }); }); diff --git a/ui/src/hooks/useStreamingChat.ts b/ui/src/hooks/useStreamingChat.ts new file mode 100644 index 00000000..85b24228 --- /dev/null +++ b/ui/src/hooks/useStreamingChat.ts @@ -0,0 +1,73 @@ +import { useRef, useState, useTransition, useCallback } from "react"; +import { useQueryClient } from "@tanstack/react-query"; +import { chatApi } from "../api/chat"; + +export function useStreamingChat(conversationId: string | null) { + const [streamingContent, setStreamingContent] = useState(""); + const [isStreaming, setIsStreaming] = useState(false); + const abortRef = useRef(null); + const queryClient = useQueryClient(); + const [, startTransition] = useTransition(); + + const startStream = useCallback( + (userMessage: string, agentId?: string) => { + if (!conversationId) return; + + setIsStreaming(true); + setStreamingContent(""); + + const abort = new AbortController(); + abortRef.current = abort; + + chatApi.postMessageAndStream( + conversationId, + { content: userMessage, agentId }, + { + onToken: (token: string) => { + startTransition(() => { + setStreamingContent((prev) => prev + token); + }); + }, + onDone: (_messageId: string, _content: string) => { + setIsStreaming(false); + setStreamingContent(""); + abortRef.current = null; + // Optimistically insert the completed message into cache to avoid flash (Pitfall 2) + queryClient.setQueryData( + ["chat", "messages", conversationId], + (old: unknown) => old, // Keep existing data -- invalidation will refetch + ); + queryClient.invalidateQueries({ queryKey: ["chat", "messages", conversationId] }); + queryClient.invalidateQueries({ queryKey: ["chat", "conversations"] }); + }, + onError: (error: string) => { + setIsStreaming(false); + abortRef.current = null; + console.error("[useStreamingChat] Stream error:", error); + }, + }, + abort.signal, + ); + }, + [conversationId, queryClient, startTransition], + ); + + const stop = useCallback(() => { + abortRef.current?.abort(); + abortRef.current = null; + const partial = streamingContent; + setIsStreaming(false); + setStreamingContent(""); + // Persist partial content with [stopped] suffix (Open Question 3) + if (conversationId && partial.trim()) { + chatApi.savePartialMessage(conversationId, { + role: "assistant", + content: partial.trim() + " [stopped]", + }).then(() => { + queryClient.invalidateQueries({ queryKey: ["chat", "messages", conversationId] }); + }); + } + }, [conversationId, streamingContent, queryClient]); + + return { streamingContent, isStreaming, startStream, stop }; +}