nexus/.planning/phases/40-job-infrastructure/40-02-PLAN.md

317 lines
16 KiB
Markdown

---
phase: 40-job-infrastructure
plan: 02
type: execute
wave: 2
depends_on: ["40-01"]
files_modified:
- server/src/routes/content-jobs.ts
- server/src/app.ts
- server/src/__tests__/content-jobs-routes.test.ts
- server/src/__tests__/content-jobs-sse.test.ts
autonomous: true
requirements:
- INFRA-01
- INFRA-02
- INFRA-03
- INFRA-04
must_haves:
truths:
- "POST /api/companies/:companyId/content-jobs returns 202 with jobId and status within 200ms"
- "GET /api/companies/:companyId/content-jobs/:jobId returns the job record with current status"
- "GET /api/companies/:companyId/content-jobs lists all jobs for a company ordered by createdAt desc"
- "GET /api/companies/:companyId/content-jobs/:jobId/events streams SSE events until terminal state then closes"
- "SSE connection is cleaned up when client disconnects mid-job"
- "Every submitted job accepts sourceTaskId for asset linkage"
artifacts:
- path: "server/src/routes/content-jobs.ts"
provides: "HTTP routes for content job submission, retrieval, and SSE progress"
exports: ["contentJobRoutes"]
- path: "server/src/__tests__/content-jobs-routes.test.ts"
provides: "Integration tests for content job routes"
min_lines: 50
- path: "server/src/__tests__/content-jobs-sse.test.ts"
provides: "Integration tests for SSE endpoint"
min_lines: 30
key_links:
- from: "server/src/routes/content-jobs.ts"
to: "server/src/services/content-job-store.ts"
via: "contentJobStore(db) calls in route handlers"
pattern: "contentJobStore\\(db\\)"
- from: "server/src/routes/content-jobs.ts"
to: "server/src/services/content-job-runner.ts"
via: "contentJobRunner.dispatch() in POST handler"
pattern: "contentJobRunner\\.dispatch"
- from: "server/src/routes/content-jobs.ts"
to: "server/src/services/live-events.ts"
via: "subscribeCompanyLiveEvents in SSE endpoint"
pattern: "subscribeCompanyLiveEvents"
- from: "server/src/app.ts"
to: "server/src/routes/content-jobs.ts"
via: "api.use mount"
pattern: "contentJobRoutes"
---
<objective>
Wire the HTTP routes for content job submission (POST 202), retrieval (GET), listing (GET), and SSE progress streaming (GET /events). Mount in app.ts. Add integration tests covering all INFRA requirements.
Purpose: Completes the public API surface that downstream phases (41-45) and browser clients use to submit and monitor content generation jobs.
Output: content-jobs.ts route file, app.ts mount, two test files.
</objective>
<execution_context>
@$HOME/.claude/get-shit-done/workflows/execute-plan.md
@$HOME/.claude/get-shit-done/templates/summary.md
</execution_context>
<context>
@.planning/PROJECT.md
@.planning/ROADMAP.md
@.planning/STATE.md
@.planning/phases/40-job-infrastructure/40-RESEARCH.md
@.planning/phases/40-job-infrastructure/40-01-SUMMARY.md
<interfaces>
<!-- Contracts from Plan 01 that this plan depends on. -->
From server/src/services/content-job-store.ts (created in 40-01):
```typescript
export function contentJobStore(db: Db) {
return {
create: (companyId: string, data: { jobType: string; input: Record<string, unknown>; sourceTaskId: string | null }) => Promise<ContentJob>,
getById: (id: string) => Promise<ContentJob | null>,
listByCompany: (companyId: string) => Promise<ContentJob[]>,
transition: (id: string, patch: Partial<...>) => Promise<void>,
};
}
```
From server/src/services/content-job-runner.ts (created in 40-01):
```typescript
export const contentJobRunner = {
dispatch(db: Db, storage: StorageService, job: ContentJob): void,
};
```
From server/src/services/live-events.ts:
```typescript
export function subscribeCompanyLiveEvents(companyId: string, listener: (event: LiveEvent) => void): () => void;
```
From server/src/routes/authz.ts:
```typescript
export function assertCompanyAccess(req: Request, companyId: string): void;
```
From server/src/app.ts — route mounting pattern:
```typescript
import { assetRoutes } from "./routes/assets.js";
// ...
api.use(assetRoutes(db, opts.storageService));
```
From server/src/__tests__/assets.test.ts — test pattern uses supertest + createApp:
```typescript
import { describe, it, expect } from "vitest";
import request from "supertest";
```
</interfaces>
</context>
<tasks>
<task type="auto" tdd="true">
<name>Task 1: Content job routes and app.ts wiring</name>
<files>
server/src/routes/content-jobs.ts,
server/src/app.ts
</files>
<read_first>
server/src/routes/assets.ts,
server/src/routes/voice.ts,
server/src/routes/authz.ts,
server/src/app.ts,
server/src/services/content-job-store.ts,
server/src/services/content-job-runner.ts,
server/src/services/live-events.ts
</read_first>
<behavior>
- POST /api/companies/:companyId/content-jobs: validates companyAccess, accepts { jobType, input?, sourceTaskId? }, creates job via store, calls contentJobRunner.dispatch (fire-and-forget), returns 202 { jobId, status, createdAt }
- GET /api/companies/:companyId/content-jobs: validates companyAccess, returns array of jobs from store.listByCompany
- GET /api/companies/:companyId/content-jobs/:jobId: validates companyAccess, returns job from store.getById, 404 if not found
- GET /api/companies/:companyId/content-jobs/:jobId/events: validates companyAccess, sets SSE headers (Content-Type: text/event-stream, Cache-Control: no-cache, Connection: keep-alive), flushHeaders, sends current status immediately, if terminal ends, otherwise subscribes to live events filtered by content_job.* for this jobId, sends status events, ends on terminal, unsubscribes on req.close
</behavior>
<action>
1. Create `server/src/routes/content-jobs.ts`:
```typescript
import { Router } from "express";
import type { Db } from "@paperclipai/db";
import type { StorageService } from "../storage/types.js";
import { contentJobStore } from "../services/content-job-store.js";
import { contentJobRunner } from "../services/content-job-runner.js";
import { subscribeCompanyLiveEvents } from "../services/live-events.js";
import { assertCompanyAccess } from "./authz.js";
export function contentJobRoutes(db: Db, storage: StorageService) {
const router = Router();
// ... mount 4 endpoints below under /companies/:companyId/content-jobs
}
```
POST handler at `/companies/:companyId/content-jobs`:
- `const companyId = req.params.companyId!;`
- `assertCompanyAccess(req, companyId);`
- Destructure `{ jobType, input, sourceTaskId }` from `req.body`
- Validate jobType is a non-empty string; return 400 `{ error: "jobType is required" }` if missing
- `const store = contentJobStore(db);`
- `const job = await store.create(companyId, { jobType, input: input ?? {}, sourceTaskId: sourceTaskId ?? null });`
- `void contentJobRunner.dispatch(db, storage, job);` (fire-and-forget, do NOT await)
- `res.status(202).json({ jobId: job.id, status: job.status, createdAt: job.createdAt });`
GET list handler at `/companies/:companyId/content-jobs`:
- assertCompanyAccess, then `const jobs = await contentJobStore(db).listByCompany(companyId);`
- `res.json(jobs);`
GET by ID handler at `/companies/:companyId/content-jobs/:jobId`:
- assertCompanyAccess, then `const job = await contentJobStore(db).getById(req.params.jobId!);`
- If !job, `res.status(404).json({ error: "Job not found" }); return;`
- `res.json(job);`
SSE handler at `/companies/:companyId/content-jobs/:jobId/events`:
- assertCompanyAccess
- Set headers: `Content-Type: text/event-stream`, `Cache-Control: no-cache`, `Connection: keep-alive`
- `res.flushHeaders();`
- Define helper: `const sendEvent = (type: string, data: unknown) => { res.write(\`event: ${type}\\ndata: ${JSON.stringify(data)}\\n\\n\`); };`
- Fetch current job: `const job = await contentJobStore(db).getById(req.params.jobId!);`
- If !job, `sendEvent("error", { error: "Job not found" }); res.end(); return;`
- `sendEvent("status", { jobId: job.id, status: job.status, resultAssetId: job.resultAssetId, errorMessage: job.errorMessage });`
- If `job.status === "done" || job.status === "failed"`, `res.end(); return;`
- Subscribe: `const unsubscribe = subscribeCompanyLiveEvents(companyId, (event) => { ... });`
- In listener: check `event.type` starts with `"content_job."` and `event.payload.jobId === req.params.jobId`
- Send `sendEvent("status", event.payload);`
- If `event.payload.status === "done" || event.payload.status === "failed"`, call `unsubscribe(); res.end();`
- Register cleanup: `req.on("close", () => { unsubscribe(); });`
- Return router
2. Modify `server/src/app.ts`:
- Add import: `import { contentJobRoutes } from "./routes/content-jobs.js";`
- Add mount after the `api.use(voiceRoutes());` line: `api.use(contentJobRoutes(db, opts.storageService));`
Anti-patterns to avoid:
- Do NOT await contentJobRunner.dispatch — must be `void` (fire-and-forget)
- Do NOT use a polling loop in SSE — subscribe to EventEmitter
- Do NOT forget req.on("close") cleanup — prevents listener leaks
</action>
<verify>
<automated>cd /opt/nexus && pnpm tsc --noEmit --project server/tsconfig.json</automated>
</verify>
<acceptance_criteria>
- server/src/routes/content-jobs.ts contains `export function contentJobRoutes(db: Db, storage: StorageService)`
- server/src/routes/content-jobs.ts contains `res.status(202).json(`
- server/src/routes/content-jobs.ts contains `void contentJobRunner.dispatch(`
- server/src/routes/content-jobs.ts contains `text/event-stream`
- server/src/routes/content-jobs.ts contains `res.flushHeaders()`
- server/src/routes/content-jobs.ts contains `subscribeCompanyLiveEvents(`
- server/src/routes/content-jobs.ts contains `req.on("close"`
- server/src/routes/content-jobs.ts contains `assertCompanyAccess(`
- server/src/routes/content-jobs.ts contains `jobType is required`
- server/src/app.ts contains `import { contentJobRoutes } from "./routes/content-jobs.js"`
- server/src/app.ts contains `contentJobRoutes(db, opts.storageService)`
- `pnpm tsc --noEmit --project server/tsconfig.json` exits 0
</acceptance_criteria>
<done>Four route endpoints operational: POST returns 202 with jobId, GET list returns jobs, GET by ID returns single job or 404, GET events streams SSE until terminal. All mounted in app.ts. TypeScript compiles clean.</done>
</task>
<task type="auto" tdd="true">
<name>Task 2: Integration tests for content job routes and SSE</name>
<files>
server/src/__tests__/content-jobs-routes.test.ts,
server/src/__tests__/content-jobs-sse.test.ts
</files>
<read_first>
server/src/__tests__/assets.test.ts,
server/src/__tests__/chat-routes.test.ts,
server/src/routes/content-jobs.ts,
server/vitest.config.ts
</read_first>
<behavior>
- content-jobs-routes.test.ts:
- POST /api/companies/:id/content-jobs returns 202 with { jobId, status: "queued", createdAt } (INFRA-01)
- POST without jobType returns 400 (INFRA-01)
- POST with sourceTaskId stores it on the created job (INFRA-04)
- GET /api/companies/:id/content-jobs returns array of jobs (INFRA-01)
- GET /api/companies/:id/content-jobs/:jobId returns the job (INFRA-01)
- GET /api/companies/:id/content-jobs/nonexistent returns 404 (INFRA-01)
- content-jobs-sse.test.ts:
- GET /api/companies/:id/content-jobs/:jobId/events returns Content-Type text/event-stream (INFRA-02)
- SSE sends initial status event immediately (INFRA-02)
- SSE for a terminal job (done/failed) ends the stream after initial event (INFRA-02)
</behavior>
<action>
1. Create `server/src/__tests__/content-jobs-routes.test.ts`:
- Follow the pattern from existing test files (assets.test.ts, chat-routes.test.ts)
- Import describe, it, expect, beforeAll, afterAll from "vitest"
- Import supertest as request
- Set up test app using the same test harness pattern as other test files in the codebase
- Read the existing test setup pattern from assets.test.ts or chat-routes.test.ts to understand how db/app is initialized in tests
Test cases:
a) `it("POST /api/companies/:id/content-jobs returns 202 with jobId")` — POST with jobType: "test", verify status 202, body has jobId (string), status === "queued", createdAt (string)
b) `it("POST without jobType returns 400")` — POST with empty body, verify status 400, body.error contains "jobType"
c) `it("POST with sourceTaskId persists it")` — POST with jobType: "test", sourceTaskId: "task-abc-123", then GET the job by ID, verify response body has sourceTaskId === "task-abc-123"
d) `it("GET /api/companies/:id/content-jobs returns job list")` — after creating a job, GET list, verify array with length >= 1
e) `it("GET /api/companies/:id/content-jobs/:jobId returns job")` — create job, GET by ID, verify body.id matches
f) `it("GET /api/companies/:id/content-jobs/nonexistent returns 404")` — GET with random UUID, verify 404
2. Create `server/src/__tests__/content-jobs-sse.test.ts`:
- Same test harness setup
Test cases:
a) `it("SSE endpoint returns text/event-stream content type")` — GET /events, verify Content-Type header contains "text/event-stream"
b) `it("SSE sends initial status event")` — GET /events, read first chunk, verify it contains `event: status` and the job's current status
c) `it("SSE for nonexistent job sends error event")` — GET /events with random UUID, verify response contains `event: error`
Note: Follow whichever test harness setup pattern the existing tests use (createTestApp helper, beforeAll/afterAll DB setup, etc.). Read the existing test files first to match the pattern exactly.
</action>
<verify>
<automated>cd /opt/nexus && pnpm --filter @paperclipai/server test:run -- src/__tests__/content-jobs-routes.test.ts src/__tests__/content-jobs-sse.test.ts</automated>
</verify>
<acceptance_criteria>
- server/src/__tests__/content-jobs-routes.test.ts exists with at least 50 lines
- server/src/__tests__/content-jobs-routes.test.ts contains `202`
- server/src/__tests__/content-jobs-routes.test.ts contains `400`
- server/src/__tests__/content-jobs-routes.test.ts contains `404`
- server/src/__tests__/content-jobs-routes.test.ts contains `sourceTaskId`
- server/src/__tests__/content-jobs-routes.test.ts contains `content-jobs`
- server/src/__tests__/content-jobs-sse.test.ts exists with at least 30 lines
- server/src/__tests__/content-jobs-sse.test.ts contains `text/event-stream`
- server/src/__tests__/content-jobs-sse.test.ts contains `event: status`
- All tests pass: `pnpm --filter @paperclipai/server test:run -- src/__tests__/content-jobs` exits 0
</acceptance_criteria>
<done>Integration tests cover: 202 job submission, 400 validation, sourceTaskId persistence (INFRA-04), job listing, single job retrieval, 404 missing job, SSE content-type, SSE initial event, SSE error for missing job. All tests pass.</done>
</task>
</tasks>
<verification>
- `pnpm --filter @paperclipai/server test:run -- src/__tests__/content-jobs` — all tests green
- `pnpm tsc --noEmit --project server/tsconfig.json` — no type errors
- POST /api/companies/:id/content-jobs returns 202 (INFRA-01)
- GET /api/companies/:id/content-jobs/:id/events returns SSE stream (INFRA-02)
- Job created with sourceTaskId shows it in GET response (INFRA-04)
</verification>
<success_criteria>
- 4 HTTP endpoints operational: POST (202), GET list, GET by ID, GET SSE events
- POST validates jobType presence (400 on missing)
- SSE streams events and cleans up on disconnect
- Routes mounted in app.ts
- All integration tests pass
- TypeScript compiles clean
</success_criteria>
<output>
After completion, create `.planning/phases/40-job-infrastructure/40-02-SUMMARY.md`
</output>