397 lines
18 KiB
Markdown
397 lines
18 KiB
Markdown
---
|
|
phase: 22-agent-streaming
|
|
plan: 01
|
|
type: execute
|
|
wave: 1
|
|
depends_on: []
|
|
files_modified:
|
|
- packages/db/src/schema/chat_messages.ts
|
|
- packages/db/src/migrations/TBD_agent_streaming.sql
|
|
- packages/shared/src/types/chat.ts
|
|
- packages/shared/src/validators/chat.ts
|
|
- server/src/services/chat.ts
|
|
- server/src/routes/chat.ts
|
|
- server/src/__tests__/chat-stream-routes.test.ts
|
|
- server/src/__tests__/chat-routes.test.ts
|
|
autonomous: true
|
|
requirements: [CHAT-01, CHAT-08, CHAT-10, CHAT-12, PERF-02]
|
|
|
|
must_haves:
|
|
truths:
|
|
- "POST user message then GET /conversations/:id/stream returns text/event-stream with token events followed by a done event"
|
|
- "PATCH /conversations/:id accepts agentId field and persists it"
|
|
- "PUT /conversations/:id/messages/:messageId updates editedContent and editedAt"
|
|
- "SSE stream sets X-Accel-Buffering: no and flushes headers immediately for sub-100ms latency"
|
|
- "Client disconnect causes server to stop streaming (abort detection)"
|
|
artifacts:
|
|
- path: "server/src/routes/chat.ts"
|
|
provides: "SSE stream endpoint, edit message route, updateConversation with agentId"
|
|
exports: ["chatRoutes"]
|
|
- path: "server/src/services/chat.ts"
|
|
provides: "editMessage, getMessageHistory, updateConversationAgent"
|
|
exports: ["chatService"]
|
|
- path: "packages/db/src/schema/chat_messages.ts"
|
|
provides: "editedContent and editedAt columns"
|
|
contains: "editedContent"
|
|
- path: "packages/shared/src/types/chat.ts"
|
|
provides: "Updated ChatMessage with editedContent, editedAt"
|
|
contains: "editedContent"
|
|
- path: "packages/shared/src/validators/chat.ts"
|
|
provides: "streamMessageSchema, editMessageSchema, updateConversationSchema with agentId"
|
|
contains: "streamMessageSchema"
|
|
- path: "server/src/__tests__/chat-stream-routes.test.ts"
|
|
provides: "SSE streaming tests"
|
|
key_links:
|
|
- from: "server/src/routes/chat.ts"
|
|
to: "server/src/services/chat.ts"
|
|
via: "svc.addMessage, svc.editMessage, svc.getMessageHistory"
|
|
pattern: "svc\\.(addMessage|editMessage|getMessageHistory)"
|
|
- from: "server/src/routes/chat.ts"
|
|
to: "packages/shared/src/validators/chat.ts"
|
|
via: "validate(streamMessageSchema)"
|
|
pattern: "validate\\(streamMessageSchema\\)"
|
|
---
|
|
|
|
<objective>
|
|
Server-side streaming infrastructure: DB schema additions for message editing, SSE streaming endpoint with echo-stream placeholder, message edit route, agent selection on conversations, and server tests.
|
|
|
|
Purpose: Establishes the entire server-side API surface that the UI plans (02/03) will consume. Every new endpoint is tested.
|
|
Output: Working SSE stream endpoint (echo-stream mode), edit message endpoint, conversation agent update, migration SQL, tests.
|
|
|
|
NOTE -- Echo-stream scope (CHAT-01 partial): The SSE endpoint uses an echo-stream that replays the
|
|
user's last message word-by-word. This fully exercises the streaming pipeline (SSE headers, token
|
|
events, done event, abort detection, message persistence) so the UI can be built and tested against
|
|
real streaming behavior. Real LLM integration (replacing the echo loop with an adapter call) is
|
|
Phase 23 (Brainstormer agent). The echo-stream satisfies CHAT-01's "tokens appear as generated"
|
|
contract at the transport level; Phase 23 provides semantic content.
|
|
</objective>
|
|
|
|
<execution_context>
|
|
@$HOME/.claude/get-shit-done/workflows/execute-plan.md
|
|
@$HOME/.claude/get-shit-done/templates/summary.md
|
|
</execution_context>
|
|
|
|
<context>
|
|
@.planning/PROJECT.md
|
|
@.planning/ROADMAP.md
|
|
@.planning/STATE.md
|
|
@.planning/phases/22-agent-streaming/22-RESEARCH.md
|
|
|
|
<interfaces>
|
|
<!-- Key types and contracts the executor needs. Extracted from codebase. -->
|
|
|
|
From packages/shared/src/types/chat.ts:
|
|
```typescript
|
|
export interface ChatConversation {
|
|
id: string;
|
|
companyId: string;
|
|
title: string | null;
|
|
agentId: string | null;
|
|
pinnedAt: string | null;
|
|
archivedAt: string | null;
|
|
deletedAt: string | null;
|
|
createdAt: string;
|
|
updatedAt: string;
|
|
}
|
|
|
|
export interface ChatMessage {
|
|
id: string;
|
|
conversationId: string;
|
|
role: "user" | "assistant" | "system";
|
|
content: string;
|
|
agentId: string | null;
|
|
createdAt: string;
|
|
}
|
|
```
|
|
|
|
From packages/shared/src/validators/chat.ts:
|
|
```typescript
|
|
export const createConversationSchema = z.object({ title: z.string().max(200).optional() });
|
|
export const updateConversationSchema = z.object({ title: z.string().max(200).optional() });
|
|
export const createMessageSchema = z.object({
|
|
role: z.enum(["user", "assistant", "system"]),
|
|
content: z.string().min(1),
|
|
agentId: z.string().uuid().optional().nullable(),
|
|
});
|
|
```
|
|
|
|
From packages/db/src/schema/chat_messages.ts:
|
|
```typescript
|
|
export const chatMessages = pgTable("chat_messages", {
|
|
id: uuid("id").primaryKey().defaultRandom(),
|
|
conversationId: uuid("conversation_id").notNull().references(() => chatConversations.id, { onDelete: "cascade" }),
|
|
role: text("role").notNull(),
|
|
content: text("content").notNull(),
|
|
agentId: uuid("agent_id"),
|
|
createdAt: timestamp("created_at", { withTimezone: true }).notNull().defaultNow(),
|
|
}, ...);
|
|
```
|
|
|
|
From server/src/services/chat.ts:
|
|
```typescript
|
|
export function chatService(db: Db) {
|
|
// Returns object with: listConversations, createConversation, getConversation,
|
|
// updateConversation, softDeleteConversation, archiveConversation, unarchiveConversation,
|
|
// pinConversation, unpinConversation, listMessages, addMessage
|
|
}
|
|
```
|
|
|
|
From server/src/routes/chat.ts:
|
|
```typescript
|
|
export function chatRoutes(db: Db) {
|
|
// Mounts all routes on a Router. Key: PATCH /conversations/:id uses validate(updateConversationSchema)
|
|
}
|
|
```
|
|
|
|
SSE pattern from server/src/routes/plugins.ts:1146:
|
|
```typescript
|
|
res.writeHead(200, {
|
|
"Content-Type": "text/event-stream",
|
|
"Cache-Control": "no-cache",
|
|
"Connection": "keep-alive",
|
|
"X-Accel-Buffering": "no",
|
|
});
|
|
res.flushHeaders();
|
|
res.write(":ok\n\n");
|
|
```
|
|
</interfaces>
|
|
</context>
|
|
|
|
<tasks>
|
|
|
|
<task type="auto" tdd="true">
|
|
<name>Task 1: DB migration + shared types + validators + service methods for streaming and editing</name>
|
|
<files>
|
|
packages/db/src/schema/chat_messages.ts,
|
|
packages/shared/src/types/chat.ts,
|
|
packages/shared/src/validators/chat.ts,
|
|
server/src/services/chat.ts,
|
|
server/src/__tests__/chat-routes.test.ts
|
|
</files>
|
|
<read_first>
|
|
packages/db/src/schema/chat_messages.ts,
|
|
packages/db/src/schema/chat_conversations.ts,
|
|
packages/shared/src/types/chat.ts,
|
|
packages/shared/src/validators/chat.ts,
|
|
server/src/services/chat.ts,
|
|
server/src/__tests__/chat-routes.test.ts
|
|
</read_first>
|
|
<behavior>
|
|
- Test: editMessage(messageId, { content }) updates the message's editedContent and editedAt, returns updated row
|
|
- Test: getMessageHistory(conversationId) returns all messages in ascending createdAt order (for LLM context)
|
|
- Test: updateConversation with agentId field persists the agentId on the conversation
|
|
- Test: PATCH /conversations/:id with { agentId: "uuid" } returns 200 with updated conversation
|
|
- Test: PUT /conversations/:id/messages/:messageId with { content: "new" } returns 200 with editedContent set
|
|
</behavior>
|
|
<action>
|
|
1. **DB schema** -- Add two columns to `chatMessages` in `packages/db/src/schema/chat_messages.ts`:
|
|
```typescript
|
|
editedContent: text("edited_content"),
|
|
editedAt: timestamp("edited_at", { withTimezone: true }),
|
|
```
|
|
Then run `pnpm db:generate` to create the migration SQL.
|
|
|
|
2. **Shared types** -- Update `ChatMessage` interface in `packages/shared/src/types/chat.ts`:
|
|
- Add `editedContent: string | null;`
|
|
- Add `editedAt: string | null;`
|
|
|
|
3. **Validators** -- In `packages/shared/src/validators/chat.ts`:
|
|
- Update `updateConversationSchema` to include `agentId: z.string().uuid().optional().nullable()`
|
|
- Add `export const editMessageSchema = z.object({ content: z.string().min(1) });`
|
|
- Add `export const streamMessageSchema = z.object({ content: z.string().min(1), agentId: z.string().uuid().optional().nullable() });`
|
|
|
|
4. **Service methods** -- Add to `chatService` in `server/src/services/chat.ts`:
|
|
- `editMessage(messageId: string, data: { content: string })` -- sets `editedContent = data.content`, `editedAt = new Date()` on the message row, returns the updated row
|
|
- `getMessageHistory(conversationId: string)` -- selects all messages WHERE conversationId matches, ORDER BY createdAt ASC (ascending, for LLM context window). Returns `ChatMessage[]`. Use `editedContent ?? content` as the effective content field (alias as `effectiveContent` in the return).
|
|
- Update `updateConversation` to accept and persist `agentId` field: `set({ title: data.title, agentId: data.agentId, updatedAt: new Date() })`. Only set fields that are provided (check `data.agentId !== undefined` before including in set).
|
|
|
|
5. **Extend existing tests** in `server/src/__tests__/chat-routes.test.ts`:
|
|
- Add test: `PATCH /conversations/:id with agentId` -- create conversation, PATCH with `{ agentId: someAgentId }`, verify response has the agentId set. (Use a dummy UUID string for agentId if the test DB doesn't enforce FK -- check existing test patterns.)
|
|
- Add test: `PUT /conversations/:id/messages/:messageId` -- create conversation, add message, PUT with `{ content: "edited" }`, verify response has `editedContent: "edited"` and `editedAt` is not null.
|
|
</action>
|
|
<verify>
|
|
<automated>pnpm --filter @paperclipai/server test run -- --reporter=verbose chat-routes</automated>
|
|
</verify>
|
|
<acceptance_criteria>
|
|
- grep -q "editedContent" packages/db/src/schema/chat_messages.ts returns 0
|
|
- grep -q "editedAt" packages/db/src/schema/chat_messages.ts returns 0
|
|
- grep -q "editedContent: string | null" packages/shared/src/types/chat.ts returns 0
|
|
- grep -q "editMessageSchema" packages/shared/src/validators/chat.ts returns 0
|
|
- grep -q "streamMessageSchema" packages/shared/src/validators/chat.ts returns 0
|
|
- grep -q "agentId" packages/shared/src/validators/chat.ts (in updateConversationSchema) returns 0
|
|
- grep -q "editMessage" server/src/services/chat.ts returns 0
|
|
- grep -q "getMessageHistory" server/src/services/chat.ts returns 0
|
|
- Migration SQL file exists in packages/db/src/migrations/
|
|
- pnpm --filter @paperclipai/server test run -- chat-routes exits 0
|
|
</acceptance_criteria>
|
|
<done>DB has editedContent/editedAt columns, shared types updated, validators for stream/edit/agentId exist, service has editMessage + getMessageHistory, all tests pass</done>
|
|
</task>
|
|
|
|
<task type="auto" tdd="true">
|
|
<name>Task 2: SSE echo-stream endpoint + edit message route + stream tests</name>
|
|
<files>
|
|
server/src/routes/chat.ts,
|
|
server/src/__tests__/chat-stream-routes.test.ts
|
|
</files>
|
|
<read_first>
|
|
server/src/routes/chat.ts,
|
|
server/src/routes/plugins.ts (lines 1095-1186 for SSE pattern),
|
|
server/src/services/chat.ts,
|
|
packages/shared/src/validators/chat.ts,
|
|
server/src/__tests__/chat-routes.test.ts
|
|
</read_first>
|
|
<behavior>
|
|
- Test: GET /conversations/:id/stream?triggerMessageId=X returns Content-Type text/event-stream
|
|
- Test: GET /conversations/:id/stream?triggerMessageId=X returns X-Accel-Buffering: no header
|
|
- Test: Stream sends initial `:ok` comment, then token events, then a done event
|
|
- Test: PUT /conversations/:id/messages/:messageId route validates body with editMessageSchema
|
|
- Test: Client close (req.destroy()) stops the stream loop
|
|
</behavior>
|
|
<action>
|
|
**IMPORTANT -- Echo-stream placeholder:** This task implements an echo-stream (replays the user's
|
|
last message word-by-word) as a functional placeholder. This is intentional -- it fully exercises
|
|
the SSE pipeline so the UI (Plans 02/03) can develop against real streaming behavior. Phase 23
|
|
will replace the echo loop body with real LLM adapter calls. The SSE contract (token events,
|
|
done event, abort detection, message persistence) is the deliverable here, not LLM content.
|
|
|
|
1. **Edit message route** -- Add to `server/src/routes/chat.ts`:
|
|
```typescript
|
|
// PUT /conversations/:id/messages/:messageId
|
|
router.put("/conversations/:id/messages/:messageId", validate(editMessageSchema), async (req, res) => {
|
|
assertBoard(req);
|
|
const message = await svc.editMessage(req.params.messageId as string, req.body);
|
|
if (!message) {
|
|
res.status(404).json({ error: "Not found" });
|
|
return;
|
|
}
|
|
res.json(message);
|
|
});
|
|
```
|
|
|
|
2. **SSE stream endpoint** -- Add to `server/src/routes/chat.ts`:
|
|
```typescript
|
|
// GET /conversations/:id/stream
|
|
router.get("/conversations/:id/stream", async (req, res) => {
|
|
assertBoard(req);
|
|
const conversationId = req.params.id as string;
|
|
const triggerMessageId = req.query.triggerMessageId as string | undefined;
|
|
|
|
const conversation = await svc.getConversation(conversationId);
|
|
if (!conversation) {
|
|
res.status(404).json({ error: "Not found" });
|
|
return;
|
|
}
|
|
|
|
// Set SSE headers -- copied from plugins.ts:1146
|
|
res.writeHead(200, {
|
|
"Content-Type": "text/event-stream",
|
|
"Cache-Control": "no-cache",
|
|
"Connection": "keep-alive",
|
|
"X-Accel-Buffering": "no",
|
|
});
|
|
res.flushHeaders();
|
|
res.write(":ok\n\n");
|
|
|
|
let aborted = false;
|
|
req.on("close", () => { aborted = true; });
|
|
|
|
// Resolve the agent for this conversation
|
|
const agentId = conversation.agentId;
|
|
|
|
// Get message history for LLM context
|
|
const history = await svc.getMessageHistory(conversationId);
|
|
|
|
// ECHO-STREAM PLACEHOLDER (Phase 22):
|
|
// Streams the user's last message back word-by-word to fully exercise the SSE
|
|
// pipeline. Phase 23 replaces this block with:
|
|
// const adapter = resolveAdapter(agentId);
|
|
// for await (const token of adapter.stream(history)) { ... }
|
|
const lastUserMsg = history.filter(m => m.role === "user").at(-1);
|
|
const echoContent = lastUserMsg
|
|
? `Echo from agent: ${lastUserMsg.content}`
|
|
: "No message to echo.";
|
|
const tokens = echoContent.split(/(\s+)/);
|
|
|
|
let accumulated = "";
|
|
for (const token of tokens) {
|
|
if (aborted) break;
|
|
accumulated += token;
|
|
res.write(`data: ${JSON.stringify({ type: "token", content: token })}\n\n`);
|
|
// Tiny yield to allow abort detection
|
|
await new Promise(resolve => setTimeout(resolve, 5));
|
|
}
|
|
|
|
// Persist assistant message only if stream completed (not aborted)
|
|
if (!aborted && accumulated.trim()) {
|
|
const assistantMsg = await svc.addMessage(conversationId, {
|
|
role: "assistant",
|
|
content: accumulated,
|
|
agentId,
|
|
});
|
|
res.write(`data: ${JSON.stringify({ type: "done", messageId: assistantMsg.id })}\n\n`);
|
|
} else if (aborted) {
|
|
// Do NOT persist partial messages per RESEARCH.md pitfall 4
|
|
}
|
|
|
|
res.end();
|
|
});
|
|
```
|
|
|
|
Import `editMessageSchema` and `streamMessageSchema` from `@paperclipai/shared` at the top of the routes file (alongside existing imports).
|
|
|
|
3. **Stream tests** -- Create `server/src/__tests__/chat-stream-routes.test.ts`:
|
|
- Use the same test DB setup pattern as `chat-routes.test.ts` (read that file for the pattern).
|
|
- Test: `GET /conversations/:id/stream?triggerMessageId=X` -- create conversation, add user message, open stream, collect all SSE data events, verify:
|
|
- Response status is 200
|
|
- Content-Type header contains "text/event-stream"
|
|
- X-Accel-Buffering header is "no"
|
|
- First received data is `:ok` comment (or first data event has type "token")
|
|
- Last data event has `type: "done"` with a `messageId` string
|
|
- Test: `GET /conversations/:id/stream` for non-existent conversation returns 404
|
|
- Test: After stream completes, a new assistant message exists in the DB (query via list messages)
|
|
- Test: `PUT /conversations/:id/messages/:messageId` with valid body returns 200 and editedContent matches
|
|
|
|
For SSE testing: use supertest's `.buffer(true).parse(...)` or collect the raw response body. Alternatively, make a raw HTTP request to the test server and read the stream. Follow whatever pattern the existing test file uses for HTTP calls.
|
|
|
|
4. Add the `editMessageSchema` and `streamMessageSchema` imports to the routes file's import block from `@paperclipai/shared`.
|
|
</action>
|
|
<verify>
|
|
<automated>pnpm --filter @paperclipai/server test run -- --reporter=verbose chat-stream</automated>
|
|
</verify>
|
|
<acceptance_criteria>
|
|
- grep -q 'router.get("/conversations/:id/stream"' server/src/routes/chat.ts returns 0
|
|
- grep -q 'router.put("/conversations/:id/messages/:messageId"' server/src/routes/chat.ts returns 0
|
|
- grep -q "text/event-stream" server/src/routes/chat.ts returns 0
|
|
- grep -q "X-Accel-Buffering" server/src/routes/chat.ts returns 0
|
|
- grep -q "flushHeaders" server/src/routes/chat.ts returns 0
|
|
- grep -q 'type: "done"' server/src/routes/chat.ts returns 0
|
|
- grep -q 'type: "token"' server/src/routes/chat.ts returns 0
|
|
- grep -q "ECHO-STREAM PLACEHOLDER" server/src/routes/chat.ts returns 0
|
|
- test -f server/src/__tests__/chat-stream-routes.test.ts
|
|
- pnpm --filter @paperclipai/server test run -- chat-stream exits 0
|
|
- pnpm --filter @paperclipai/server test run exits 0 (all server tests green)
|
|
</acceptance_criteria>
|
|
<done>SSE echo-stream endpoint returns text/event-stream with token+done events (placeholder for Phase 23 LLM integration), edit message route works, abort detection stops streaming, all server tests pass</done>
|
|
</task>
|
|
|
|
</tasks>
|
|
|
|
<verification>
|
|
- `pnpm --filter @paperclipai/server test run` -- all server tests pass
|
|
- `pnpm db:generate` has been run and migration exists
|
|
- SSE endpoint tested with token + done events (echo-stream mode)
|
|
- Edit message route tested with editedContent persistence
|
|
- PATCH conversation with agentId tested
|
|
</verification>
|
|
|
|
<success_criteria>
|
|
1. New migration SQL exists and applies the editedContent + editedAt columns
|
|
2. GET /conversations/:id/stream returns text/event-stream with token events then done event (echo-stream placeholder -- Phase 23 replaces with real LLM)
|
|
3. PUT /conversations/:id/messages/:messageId updates editedContent and editedAt
|
|
4. PATCH /conversations/:id with { agentId } persists the agent selection
|
|
5. All server tests pass (both chat-routes and chat-stream-routes)
|
|
</success_criteria>
|
|
|
|
<output>
|
|
After completion, create `.planning/phases/22-agent-streaming/22-01-SUMMARY.md`
|
|
</output>
|