26 KiB
Phase 40: Job Infrastructure - Research
Researched: 2026-04-04 Domain: Async job lifecycle, SSE streaming, storage namespacing, asset tracking Confidence: HIGH
<user_constraints>
User Constraints (from CONTEXT.md)
Locked Decisions
All implementation choices are at Claude's discretion — discuss phase was skipped per user setting. Use ROADMAP phase goal, success criteria, and codebase conventions to guide decisions.
Claude's Discretion
All implementation choices are at Claude's discretion.
Deferred Ideas (OUT OF SCOPE)
None — discuss phase skipped. </user_constraints>
<phase_requirements>
Phase Requirements
| ID | Description | Research Support |
|---|---|---|
| INFRA-01 | System processes content generation jobs asynchronously with queued → running → done/failed lifecycle | New content_jobs table + in-process job runner service; pattern mirrors heartbeat_runs / plugin_job_runs already in codebase |
| INFRA-02 | System pushes job progress updates via SSE to connected clients | Existing live-events.ts EventEmitter + LIVE_EVENT_TYPES const; add new event types and a new SSE endpoint scoped to jobs |
| INFRA-03 | Generated content stored in namespaced storage without size restrictions blocking video/images | StorageService.putFile() already has no server-side byte limit; add generated/ namespace + MAX_GENERATED_ASSET_BYTES constant bypassing the upload-route multer limit |
| INFRA-04 | All generated content tracked in database with source conversation linkage | Extend assets table with source_task_id column (nullable FK); assetService.create() gains optional sourceTaskId parameter |
| </phase_requirements> |
Summary
Phase 40 builds the async foundation that every subsequent content generation phase depends on. The codebase already has two well-established job-tracking patterns (heartbeat_runs, plugin_job_runs) and a working SSE streaming pattern in three routes (voice.ts, puter-proxy.ts, plugins.ts). The work here is additive: a new content_jobs DB table, a minimal in-process job runner, new live-event types for SSE progress, a generated/ storage namespace with its own size constant, and a source_task_id column on assets.
No external queue infrastructure (Redis, BullMQ) is needed. The project is single-user and local-first. An in-process async runner (fire-and-forget void Promise) with EventEmitter fan-out — matching the heartbeat pattern — is the correct approach. If a job crashes the process restarts clean; orphan prevention is via source_task_id so consumers can audit.
Primary recommendation: Mirror the heartbeat_runs table/service pattern for the content_jobs table. Use the existing publishLiveEvent function with new content_job.* event types for SSE. Add MAX_GENERATED_ASSET_BYTES as a server-only constant and a generated/ namespace prefix. Add source_task_id to assets via a migration.
Standard Stack
Core
| Library | Version (verified) | Purpose | Why Standard |
|---|---|---|---|
| drizzle-orm | 0.38.4 (pkg) | Schema definition + query builder | Already used throughout; schema in packages/db/src/schema/ |
| postgres (postgres.js) | project dep | DB connection | Already used via createDb() in packages/db/src/client.ts |
| express | project dep | HTTP layer for new routes | All routes are Express; follows existing pattern |
| Node.js EventEmitter | built-in | In-process pub/sub for SSE fan-out | Already used in live-events.ts; no extra dep |
Supporting
| Library | Version | Purpose | When to Use |
|---|---|---|---|
| drizzle-kit | 0.31.9 (pkg) | Migration generation | Run pnpm db:generate after schema change |
| vitest | project dep | Unit + integration tests | All server tests use vitest; pattern in server/src/__tests__/ |
| supertest | project dep | HTTP route tests | Used in assets.test.ts, chat-routes.test.ts, etc. |
Alternatives Considered
| Instead of | Could Use | Tradeoff |
|---|---|---|
| In-process EventEmitter runner | BullMQ / Redis | Redis adds infra complexity; single-user single-process — EventEmitter is correct here |
| In-process EventEmitter runner | Worker threads | Unnecessary isolation; jobs are I/O-bound (renders call child processes) |
| Custom SSE endpoint | WebSocket upgrade | SSE is simpler for one-way server → client push; WebSocket already used for live events via live-events-ws.ts — keep SSE for job polling per existing voice/puter patterns |
Installation: No new packages required. All tooling already present.
Architecture Patterns
Recommended Project Structure
packages/db/src/schema/
├── content_jobs.ts # NEW: content_jobs table definition
├── assets.ts # MODIFY: add source_task_id column
├── index.ts # MODIFY: export content_jobs
packages/db/src/migrations/
├── 0056_create_content_jobs.sql # NEW: generated via drizzle-kit
├── 0057_assets_source_task_id.sql # NEW: generated via drizzle-kit
packages/shared/src/
├── constants.ts # MODIFY: add content_job.* to LIVE_EVENT_TYPES
# add CONTENT_JOB_STATUSES constant
server/src/
├── services/
│ ├── content-job-store.ts # NEW: DB CRUD for content_jobs
│ ├── content-job-runner.ts # NEW: async executor + live-event publisher
│ └── index.ts # MODIFY: export new services
├── routes/
│ ├── content-jobs.ts # NEW: POST /companies/:id/content-jobs
│ │ # GET /companies/:id/content-jobs/:jobId
│ │ # GET /companies/:id/content-jobs/:jobId/events (SSE)
│ └── index.ts # MODIFY: mount content-job routes
├── app.ts # MODIFY: register routes
Pattern 1: content_jobs Table (mirrors heartbeat_runs / plugin_job_runs)
What: A persisted lifecycle table for async content generation requests. When to use: Any content generation work that may take >200ms.
// packages/db/src/schema/content_jobs.ts
// Pattern source: packages/db/src/schema/heartbeat_runs.ts + plugin_jobs.ts
import { pgTable, uuid, text, timestamp, jsonb, index } from "drizzle-orm/pg-core";
import { companies } from "./companies.js";
// Status lifecycle: queued → running → done | failed
export const CONTENT_JOB_STATUSES = ["queued", "running", "done", "failed"] as const;
export type ContentJobStatus = (typeof CONTENT_JOB_STATUSES)[number];
export const contentJobs = pgTable(
"content_jobs",
{
id: uuid("id").primaryKey().defaultRandom(),
companyId: uuid("company_id").notNull().references(() => companies.id),
jobType: text("job_type").notNull(), // e.g. "diagram", "theme", "video"
status: text("status").$type<ContentJobStatus>().notNull().default("queued"),
input: jsonb("input").notNull().default({}), // renderer-specific params
resultAssetId: uuid("result_asset_id"), // populated on done
errorMessage: text("error_message"), // populated on failed
sourceTaskId: text("source_task_id"), // conversation task linkage (INFRA-04)
startedAt: timestamp("started_at", { withTimezone: true }),
finishedAt: timestamp("finished_at", { withTimezone: true }),
createdAt: timestamp("created_at", { withTimezone: true }).notNull().defaultNow(),
updatedAt: timestamp("updated_at", { withTimezone: true }).notNull().defaultNow(),
},
(table) => ({
companyStatusIdx: index("content_jobs_company_status_idx").on(table.companyId, table.status),
companyCreatedIdx: index("content_jobs_company_created_idx").on(table.companyId, table.createdAt),
}),
);
Pattern 2: HTTP 202 + Job ID Response
What: POST to submit a job returns immediately with jobId. When to use: All content generation submissions.
// server/src/routes/content-jobs.ts
router.post("/companies/:companyId/content-jobs", async (req, res) => {
assertCompanyAccess(req, companyId);
const job = await contentJobStore.create(companyId, {
jobType: req.body.jobType,
input: req.body.input ?? {},
sourceTaskId: req.body.sourceTaskId ?? null,
});
// Fire and forget — never await the runner here
void contentJobRunner.dispatch(job);
res.status(202).json({ jobId: job.id, status: job.status });
});
Pattern 3: SSE Job Progress (mirrors voice.ts pattern)
What: GET endpoint that holds connection open and pushes events until terminal state. When to use: Browser polls for job progress without polling.
// server/src/routes/content-jobs.ts
router.get("/companies/:companyId/content-jobs/:jobId/events", async (req, res) => {
assertCompanyAccess(req, companyId);
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.flushHeaders();
const sendEvent = (type: string, data: unknown) => {
res.write(`event: ${type}\ndata: ${JSON.stringify(data)}\n\n`);
};
// Send current state immediately
const job = await contentJobStore.getById(jobId);
sendEvent("status", { jobId, status: job.status });
if (job.status === "done" || job.status === "failed") {
res.end();
return;
}
// Subscribe to live events for this job
const unsubscribe = subscribeCompanyLiveEvents(companyId, (event) => {
if (event.type === "content_job.status" && event.payload.jobId === jobId) {
sendEvent("status", event.payload);
if (event.payload.status === "done" || event.payload.status === "failed") {
unsubscribe();
res.end();
}
}
});
req.on("close", () => {
unsubscribe();
});
});
Pattern 4: Live Event Types for Job Progress
What: Extend LIVE_EVENT_TYPES in shared constants. When to use: Publishing job progress from the runner.
// packages/shared/src/constants.ts — add to LIVE_EVENT_TYPES array:
"content_job.queued",
"content_job.running",
"content_job.done",
"content_job.failed",
Pattern 5: Generated Asset Storage Namespace
What: generated/ namespace bypasses upload-route multer limit.
When to use: Writing rendered output (video, SVG, PDF, PNG) from job runner.
// server/src/attachment-types.ts — add alongside MAX_ATTACHMENT_BYTES:
export const MAX_GENERATED_ASSET_BYTES =
Number(process.env.PAPERCLIP_GENERATED_ASSET_MAX_BYTES) || 500 * 1024 * 1024; // 500MB default
// Job runner stores via:
const stored = await storage.putFile({
companyId,
namespace: "generated", // bypasses upload limit — this is not from multer
originalFilename: outputFilename,
contentType,
body: outputBuffer, // renderer output, no multer involved
});
Key insight: The upload route (assets.ts) enforces limits via multer limits: { fileSize: MAX_ATTACHMENT_BYTES }. Job runners write directly to storage.putFile() — multer is never involved. The MAX_GENERATED_ASSET_BYTES constant exists for the job runner to validate before calling putFile, but putFile itself has no byte limit.
Pattern 6: source_task_id on assets (INFRA-04)
What: Nullable column added to assets table.
When to use: Every asset created by a job runner must pass sourceTaskId.
// packages/db/src/schema/assets.ts — add column:
sourceTaskId: text("source_task_id"), // nullable, no FK — task IDs are string identifiers
// assetService.create() in server/src/services/assets.ts accepts it through
// the existing spread pattern: db.insert(assets).values({ ...data, companyId })
// since the column is nullable, no callers break.
Pattern 7: content-job-store.ts (service layer)
What: CRUD service for content_jobs table. When to use: Create and update jobs from routes + runner.
// server/src/services/content-job-store.ts
// Pattern source: server/src/services/assets.ts, heartbeat.ts
export function contentJobStore(db: Db) {
return {
create: (companyId: string, data: { jobType: string; input: Record<string, unknown>; sourceTaskId: string | null }) =>
db.insert(contentJobs).values({ companyId, ...data }).returning().then((r) => r[0]),
getById: (id: string) =>
db.select().from(contentJobs).where(eq(contentJobs.id, id)).then((r) => r[0] ?? null),
listByCompany: (companyId: string) =>
db.select().from(contentJobs)
.where(eq(contentJobs.companyId, companyId))
.orderBy(desc(contentJobs.createdAt)),
transition: (id: string, patch: Partial<typeof contentJobs.$inferInsert>) =>
db.update(contentJobs).set({ ...patch, updatedAt: new Date() }).where(eq(contentJobs.id, id)),
};
}
Anti-Patterns to Avoid
- Awaiting render in HTTP handler: Never
await renderer.run()in the route handler. Alwaysvoid dispatch(job)and return 202. - Using multer for generated asset storage: Job runners call
storage.putFile()directly; multer is only for user uploads. - Hardcoding status strings: Always use the typed
CONTENT_JOB_STATUSESconstant from shared, not raw strings. - Blocking SSE on DB polling: SSE endpoint subscribes to EventEmitter via
subscribeCompanyLiveEvents, not a polling loop. - Missing
source_task_idin job creation: Every job submission should passsourceTaskIdfrom the incoming request (even if null for now); the column prevents future orphan accumulation.
Don't Hand-Roll
| Problem | Don't Build | Use Instead | Why |
|---|---|---|---|
| In-process pub/sub for SSE | Custom EventEmitter wrapper | publishLiveEvent + subscribeCompanyLiveEvents from live-events.ts |
Already handles multi-company scoping, id sequencing |
| Storage path generation | Custom UUID + date path builder | StorageService.putFile() via service.ts |
Already handles namespace normalization, sha256, objectKey construction |
| Migration execution | Custom SQL runner | pnpm db:generate then pnpm db:migrate |
Existing drizzle-kit + custom migration runner in client.ts |
| HTTP route mounting | Ad-hoc app.use() | Follow app.ts pattern: create router fn, import in app.ts |
Consistent middleware application (auth, actor, etc.) |
| Asset DB record | Custom insert | assetService(db).create() |
Already handles the full asset shape |
Common Pitfalls
Pitfall 1: Forgetting the "generated" namespace bypass logic lives in the runner, not the route
What goes wrong: Developer adds byte-limit check in the new content-job route handler (treating it like the assets upload route), rejecting large files before they're even rendered.
Why it happens: The upload route pattern uses multer limits; the developer copies that pattern.
How to avoid: The content-jobs route only creates the job record (tiny JSON). The runner calls storage.putFile() directly — no multer anywhere. The size check (MAX_GENERATED_ASSET_BYTES) belongs in the job runner, after rendering, before writing.
Warning signs: Any import of multer in content-jobs.ts or content-job-runner.ts.
Pitfall 2: SSE connection leaking if client disconnects mid-job
What goes wrong: Client disconnects; the unsubscribe callback is never called; the EventEmitter listener accumulates. With many requests this can trigger Node's MaxListeners warning.
Why it happens: SSE streams need explicit req.on("close") cleanup.
How to avoid: Always register req.on("close", () => { unsubscribe(); }) in every SSE handler. See live-events-ws.ts cleanupByClient pattern.
Warning signs: MaxListeners exceeded warning in server logs.
Pitfall 3: Publishing live events before the DB row is committed
What goes wrong: Browser receives content_job.done event, queries /content-jobs/:id, gets stale data (or 404 if the row hasn't flushed).
Why it happens: publishLiveEvent is synchronous; the DB write is async.
How to avoid: Always await db.update(...) before calling publishLiveEvent(...) in the job runner.
Warning signs: Frontend shows "done" but API returns "running".
Pitfall 4: Migration numbering collision
What goes wrong: Two migrations are created with the same prefix (e.g., 0056_) and drizzle-kit fails to apply them in order.
Why it happens: Parallel development or rebasing creates numbering conflicts.
How to avoid: Check ls packages/db/src/migrations/ before running pnpm db:generate. The last file is currently 0055_create_push_subscriptions.sql. New migrations start at 0056_.
Warning signs: pnpm db:generate creates a file with a duplicate number.
Pitfall 5: Forgetting to export from schema/index.ts and services/index.ts
What goes wrong: TypeScript compiles but runtime throws "cannot find module" or imports return undefined.
Why it happens: The project uses explicit barrel exports; tree-shaking won't auto-discover.
How to avoid: After adding content_jobs.ts schema and content-job-store.ts service, immediately add exports to packages/db/src/schema/index.ts and server/src/services/index.ts.
Pitfall 6: Adding content_job.* event types in the wrong place
What goes wrong: publishLiveEvent({ type: "content_job.running" }) throws a TypeScript error because the type is not in LIVE_EVENT_TYPES.
Why it happens: LiveEventType is derived from LIVE_EVENT_TYPES as const in packages/shared/src/constants.ts.
How to avoid: Add the four new types (content_job.queued, content_job.running, content_job.done, content_job.failed) to the LIVE_EVENT_TYPES array in constants.ts before writing the runner.
Code Examples
Submitting a Job and Returning 202
// Source: voice.ts (sync return), assets.ts (storage pattern) — combined
router.post("/companies/:companyId/content-jobs", async (req, res) => {
const companyId = req.params.companyId;
assertCompanyAccess(req, companyId);
const { jobType, input, sourceTaskId } = req.body as {
jobType: string;
input?: Record<string, unknown>;
sourceTaskId?: string;
};
const store = contentJobStore(db);
const job = await store.create(companyId, {
jobType,
input: input ?? {},
sourceTaskId: sourceTaskId ?? null,
});
void contentJobRunner.dispatch(db, storage, job); // fire and forget
res.status(202).json({
jobId: job.id,
status: job.status,
createdAt: job.createdAt,
});
});
Publishing Progress from the Runner
// Source: live-events.ts publishLiveEvent pattern
async function runJob(db: Db, storage: StorageService, job: ContentJob) {
const store = contentJobStore(db);
// Transition to running
await store.transition(job.id, { status: "running", startedAt: new Date() });
publishLiveEvent({
companyId: job.companyId,
type: "content_job.running",
payload: { jobId: job.id },
});
try {
const result = await renderContent(job.jobType, job.input);
// Store asset
const stored = await storage.putFile({
companyId: job.companyId,
namespace: "generated",
originalFilename: result.filename,
contentType: result.contentType,
body: result.buffer,
});
const asset = await assetService(db).create(job.companyId, {
...stored,
sourceTaskId: job.sourceTaskId,
createdByAgentId: null,
createdByUserId: null,
});
// Transition to done
await store.transition(job.id, { status: "done", resultAssetId: asset.id, finishedAt: new Date() });
publishLiveEvent({
companyId: job.companyId,
type: "content_job.done",
payload: { jobId: job.id, assetId: asset.id },
});
} catch (err) {
const errorMessage = err instanceof Error ? err.message : "Unknown error";
await store.transition(job.id, { status: "failed", errorMessage, finishedAt: new Date() });
publishLiveEvent({
companyId: job.companyId,
type: "content_job.failed",
payload: { jobId: job.id, errorMessage },
});
}
}
Migration for content_jobs table
-- packages/db/src/migrations/0056_create_content_jobs.sql
-- (generated by drizzle-kit; shown for reference)
CREATE TABLE IF NOT EXISTS "content_jobs" (
"id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL,
"company_id" uuid NOT NULL REFERENCES "companies"("id"),
"job_type" text NOT NULL,
"status" text NOT NULL DEFAULT 'queued',
"input" jsonb NOT NULL DEFAULT '{}',
"result_asset_id" uuid,
"error_message" text,
"source_task_id" text,
"started_at" timestamp with time zone,
"finished_at" timestamp with time zone,
"created_at" timestamp with time zone DEFAULT now() NOT NULL,
"updated_at" timestamp with time zone DEFAULT now() NOT NULL
);
CREATE INDEX IF NOT EXISTS "content_jobs_company_status_idx" ON "content_jobs" ("company_id", "status");
CREATE INDEX IF NOT EXISTS "content_jobs_company_created_idx" ON "content_jobs" ("company_id", "created_at");
Migration for source_task_id on assets
-- packages/db/src/migrations/0057_assets_source_task_id.sql
ALTER TABLE "assets" ADD COLUMN IF NOT EXISTS "source_task_id" text;
CREATE INDEX IF NOT EXISTS "assets_source_task_id_idx" ON "assets" ("source_task_id");
State of the Art
| Old Approach | Current Approach | When Changed | Impact |
|---|---|---|---|
| Polling for job status | SSE push | Existing pattern in codebase | No polling loop needed on client |
| Blocking HTTP for render | HTTP 202 + async | Decision from STATE.md | Render time decoupled from response time |
| Flat asset storage | Namespaced storage (generated/, assets/general) |
Existing service.ts pattern |
No path collision between user uploads and generated output |
Already established in project:
- All DB schemas use Drizzle ORM with explicit migrations (not
drizzle.push) - Migration files are numbered sequentially starting at
0000_; next is0056_ - Services follow a simple factory function pattern:
export function xService(db: Db) { return { ... } } - Routes follow
export function xRoutes(db, deps...) { const router = Router(); ... return router; }pattern - All routes are mounted in
server/src/app.ts
Environment Availability
Step 2.6: SKIPPED — Phase 40 is purely code and database changes. No external CLI tools, external services, or runtime binaries are required beyond Node.js 20, pnpm 9, and PostgreSQL already running.
Validation Architecture
Test Framework
| Property | Value |
|---|---|
| Framework | vitest (project dep) |
| Config file | server/vitest.config.ts — environment: "node" |
| Quick run command | pnpm test:run --project server -- --reporter=verbose src/__tests__/content-jobs* |
| Full suite command | pnpm test:run |
Phase Requirements → Test Map
| Req ID | Behavior | Test Type | Automated Command | File Exists? |
|---|---|---|---|---|
| INFRA-01 | POST /content-jobs returns 202 + jobId within 200ms | unit | pnpm --filter @paperclipai/server test:run -- src/__tests__/content-jobs-routes.test.ts |
❌ Wave 0 |
| INFRA-01 | Job transitions queued → running → done/failed | unit | same file | ❌ Wave 0 |
| INFRA-02 | SSE endpoint delivers progress events before terminal | unit | pnpm --filter @paperclipai/server test:run -- src/__tests__/content-jobs-sse.test.ts |
❌ Wave 0 |
| INFRA-03 | storage.putFile with generated/ namespace stores bytes without size error | unit | pnpm --filter @paperclipai/server test:run -- src/__tests__/content-jobs-storage.test.ts |
❌ Wave 0 |
| INFRA-04 | Asset created by job runner includes sourceTaskId | unit | covered in content-jobs-routes.test.ts | ❌ Wave 0 |
Sampling Rate
- Per task commit:
pnpm --filter @paperclipai/server test:run -- src/__tests__/content-jobs*.test.ts - Per wave merge:
pnpm test:run - Phase gate: Full suite green before
/gsd:verify-work
Wave 0 Gaps
server/src/__tests__/content-jobs-routes.test.ts— covers INFRA-01, INFRA-04server/src/__tests__/content-jobs-sse.test.ts— covers INFRA-02server/src/__tests__/content-jobs-storage.test.ts— covers INFRA-03
(No new test framework needed — vitest already configured for server.)
Project Constraints (from CLAUDE.md)
CLAUDE.md does not exist at the project root. The constraints below are derived from STATE.md key decisions which carry the same authority:
- Async job pattern is mandatory — all render requests return 202 + job ID immediately; never block HTTP on render
content_jobstable must exist before any renderer is built — this phase is the hard dependency for all others (phases 41–45)sourceTaskIdrequired on every generated asset from day one — prevents SSD orphan accumulationMAX_GENERATED_ASSET_BYTESconstant bypasses the 10MB upload limit forgenerated/namespace — separate from upload route- Async pattern —
renderPipelineServicestub must exist by end of phase (even as no-op) so phase 41 can extend it
Sources
Primary (HIGH confidence)
- Codebase:
server/src/services/live-events.ts— EventEmitter pub/sub pattern for SSE fan-out - Codebase:
server/src/routes/voice.ts— SSE headers pattern (text/event-stream,flushHeaders,res.write) - Codebase:
packages/db/src/schema/plugin_jobs.ts— job lifecycle table pattern (status, timestamps, logs) - Codebase:
packages/db/src/schema/assets.ts— asset table shape for INFRA-04 extension - Codebase:
server/src/storage/service.ts—putFilehas no byte limit; limit is in multer (upload route only) - Codebase:
packages/shared/src/constants.ts—LIVE_EVENT_TYPESpattern to extend for job events - Codebase:
server/src/attachment-types.ts—MAX_ATTACHMENT_BYTES= 10MB;MAX_GENERATED_ASSET_BYTESto be added here - Codebase:
packages/db/src/migrations/— last migration is0055_; next is0056_ - Project STATE.md — locked architecture decisions
Secondary (MEDIUM confidence)
- Pattern inference from
heartbeat_runs/plugin_job_runstables (same repo) forcontent_jobsshape
Tertiary (LOW confidence)
None — all findings are from direct codebase inspection.
Metadata
Confidence breakdown:
- Standard stack: HIGH — all verified from codebase
- Architecture: HIGH — directly modeled on existing patterns in same repo
- Pitfalls: HIGH — identified from direct code review of existing patterns
Research date: 2026-04-04 Valid until: 2026-05-04 (stable internal patterns)