nexus/.planning/phases/40-job-infrastructure/40-RESEARCH.md

26 KiB
Raw Blame History

Phase 40: Job Infrastructure - Research

Researched: 2026-04-04 Domain: Async job lifecycle, SSE streaming, storage namespacing, asset tracking Confidence: HIGH


<user_constraints>

User Constraints (from CONTEXT.md)

Locked Decisions

All implementation choices are at Claude's discretion — discuss phase was skipped per user setting. Use ROADMAP phase goal, success criteria, and codebase conventions to guide decisions.

Claude's Discretion

All implementation choices are at Claude's discretion.

Deferred Ideas (OUT OF SCOPE)

None — discuss phase skipped. </user_constraints>


<phase_requirements>

Phase Requirements

ID Description Research Support
INFRA-01 System processes content generation jobs asynchronously with queued → running → done/failed lifecycle New content_jobs table + in-process job runner service; pattern mirrors heartbeat_runs / plugin_job_runs already in codebase
INFRA-02 System pushes job progress updates via SSE to connected clients Existing live-events.ts EventEmitter + LIVE_EVENT_TYPES const; add new event types and a new SSE endpoint scoped to jobs
INFRA-03 Generated content stored in namespaced storage without size restrictions blocking video/images StorageService.putFile() already has no server-side byte limit; add generated/ namespace + MAX_GENERATED_ASSET_BYTES constant bypassing the upload-route multer limit
INFRA-04 All generated content tracked in database with source conversation linkage Extend assets table with source_task_id column (nullable FK); assetService.create() gains optional sourceTaskId parameter
</phase_requirements>

Summary

Phase 40 builds the async foundation that every subsequent content generation phase depends on. The codebase already has two well-established job-tracking patterns (heartbeat_runs, plugin_job_runs) and a working SSE streaming pattern in three routes (voice.ts, puter-proxy.ts, plugins.ts). The work here is additive: a new content_jobs DB table, a minimal in-process job runner, new live-event types for SSE progress, a generated/ storage namespace with its own size constant, and a source_task_id column on assets.

No external queue infrastructure (Redis, BullMQ) is needed. The project is single-user and local-first. An in-process async runner (fire-and-forget void Promise) with EventEmitter fan-out — matching the heartbeat pattern — is the correct approach. If a job crashes the process restarts clean; orphan prevention is via source_task_id so consumers can audit.

Primary recommendation: Mirror the heartbeat_runs table/service pattern for the content_jobs table. Use the existing publishLiveEvent function with new content_job.* event types for SSE. Add MAX_GENERATED_ASSET_BYTES as a server-only constant and a generated/ namespace prefix. Add source_task_id to assets via a migration.


Standard Stack

Core

Library Version (verified) Purpose Why Standard
drizzle-orm 0.38.4 (pkg) Schema definition + query builder Already used throughout; schema in packages/db/src/schema/
postgres (postgres.js) project dep DB connection Already used via createDb() in packages/db/src/client.ts
express project dep HTTP layer for new routes All routes are Express; follows existing pattern
Node.js EventEmitter built-in In-process pub/sub for SSE fan-out Already used in live-events.ts; no extra dep

Supporting

Library Version Purpose When to Use
drizzle-kit 0.31.9 (pkg) Migration generation Run pnpm db:generate after schema change
vitest project dep Unit + integration tests All server tests use vitest; pattern in server/src/__tests__/
supertest project dep HTTP route tests Used in assets.test.ts, chat-routes.test.ts, etc.

Alternatives Considered

Instead of Could Use Tradeoff
In-process EventEmitter runner BullMQ / Redis Redis adds infra complexity; single-user single-process — EventEmitter is correct here
In-process EventEmitter runner Worker threads Unnecessary isolation; jobs are I/O-bound (renders call child processes)
Custom SSE endpoint WebSocket upgrade SSE is simpler for one-way server → client push; WebSocket already used for live events via live-events-ws.ts — keep SSE for job polling per existing voice/puter patterns

Installation: No new packages required. All tooling already present.


Architecture Patterns

packages/db/src/schema/
├── content_jobs.ts          # NEW: content_jobs table definition
├── assets.ts                # MODIFY: add source_task_id column
├── index.ts                 # MODIFY: export content_jobs

packages/db/src/migrations/
├── 0056_create_content_jobs.sql        # NEW: generated via drizzle-kit
├── 0057_assets_source_task_id.sql      # NEW: generated via drizzle-kit

packages/shared/src/
├── constants.ts             # MODIFY: add content_job.* to LIVE_EVENT_TYPES
                             #         add CONTENT_JOB_STATUSES constant

server/src/
├── services/
│   ├── content-job-store.ts    # NEW: DB CRUD for content_jobs
│   ├── content-job-runner.ts   # NEW: async executor + live-event publisher
│   └── index.ts                # MODIFY: export new services
├── routes/
│   ├── content-jobs.ts         # NEW: POST /companies/:id/content-jobs
│   │                           #      GET  /companies/:id/content-jobs/:jobId
│   │                           #      GET  /companies/:id/content-jobs/:jobId/events (SSE)
│   └── index.ts                # MODIFY: mount content-job routes
├── app.ts                      # MODIFY: register routes

Pattern 1: content_jobs Table (mirrors heartbeat_runs / plugin_job_runs)

What: A persisted lifecycle table for async content generation requests. When to use: Any content generation work that may take >200ms.

// packages/db/src/schema/content_jobs.ts
// Pattern source: packages/db/src/schema/heartbeat_runs.ts + plugin_jobs.ts

import { pgTable, uuid, text, timestamp, jsonb, index } from "drizzle-orm/pg-core";
import { companies } from "./companies.js";

// Status lifecycle: queued → running → done | failed
export const CONTENT_JOB_STATUSES = ["queued", "running", "done", "failed"] as const;
export type ContentJobStatus = (typeof CONTENT_JOB_STATUSES)[number];

export const contentJobs = pgTable(
  "content_jobs",
  {
    id: uuid("id").primaryKey().defaultRandom(),
    companyId: uuid("company_id").notNull().references(() => companies.id),
    jobType: text("job_type").notNull(),          // e.g. "diagram", "theme", "video"
    status: text("status").$type<ContentJobStatus>().notNull().default("queued"),
    input: jsonb("input").notNull().default({}),  // renderer-specific params
    resultAssetId: uuid("result_asset_id"),        // populated on done
    errorMessage: text("error_message"),           // populated on failed
    sourceTaskId: text("source_task_id"),          // conversation task linkage (INFRA-04)
    startedAt: timestamp("started_at", { withTimezone: true }),
    finishedAt: timestamp("finished_at", { withTimezone: true }),
    createdAt: timestamp("created_at", { withTimezone: true }).notNull().defaultNow(),
    updatedAt: timestamp("updated_at", { withTimezone: true }).notNull().defaultNow(),
  },
  (table) => ({
    companyStatusIdx: index("content_jobs_company_status_idx").on(table.companyId, table.status),
    companyCreatedIdx: index("content_jobs_company_created_idx").on(table.companyId, table.createdAt),
  }),
);

Pattern 2: HTTP 202 + Job ID Response

What: POST to submit a job returns immediately with jobId. When to use: All content generation submissions.

// server/src/routes/content-jobs.ts
router.post("/companies/:companyId/content-jobs", async (req, res) => {
  assertCompanyAccess(req, companyId);
  const job = await contentJobStore.create(companyId, {
    jobType: req.body.jobType,
    input: req.body.input ?? {},
    sourceTaskId: req.body.sourceTaskId ?? null,
  });
  // Fire and forget — never await the runner here
  void contentJobRunner.dispatch(job);
  res.status(202).json({ jobId: job.id, status: job.status });
});

Pattern 3: SSE Job Progress (mirrors voice.ts pattern)

What: GET endpoint that holds connection open and pushes events until terminal state. When to use: Browser polls for job progress without polling.

// server/src/routes/content-jobs.ts
router.get("/companies/:companyId/content-jobs/:jobId/events", async (req, res) => {
  assertCompanyAccess(req, companyId);
  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Cache-Control", "no-cache");
  res.setHeader("Connection", "keep-alive");
  res.flushHeaders();

  const sendEvent = (type: string, data: unknown) => {
    res.write(`event: ${type}\ndata: ${JSON.stringify(data)}\n\n`);
  };

  // Send current state immediately
  const job = await contentJobStore.getById(jobId);
  sendEvent("status", { jobId, status: job.status });
  if (job.status === "done" || job.status === "failed") {
    res.end();
    return;
  }

  // Subscribe to live events for this job
  const unsubscribe = subscribeCompanyLiveEvents(companyId, (event) => {
    if (event.type === "content_job.status" && event.payload.jobId === jobId) {
      sendEvent("status", event.payload);
      if (event.payload.status === "done" || event.payload.status === "failed") {
        unsubscribe();
        res.end();
      }
    }
  });

  req.on("close", () => {
    unsubscribe();
  });
});

Pattern 4: Live Event Types for Job Progress

What: Extend LIVE_EVENT_TYPES in shared constants. When to use: Publishing job progress from the runner.

// packages/shared/src/constants.ts — add to LIVE_EVENT_TYPES array:
"content_job.queued",
"content_job.running",
"content_job.done",
"content_job.failed",

Pattern 5: Generated Asset Storage Namespace

What: generated/ namespace bypasses upload-route multer limit. When to use: Writing rendered output (video, SVG, PDF, PNG) from job runner.

// server/src/attachment-types.ts — add alongside MAX_ATTACHMENT_BYTES:
export const MAX_GENERATED_ASSET_BYTES =
  Number(process.env.PAPERCLIP_GENERATED_ASSET_MAX_BYTES) || 500 * 1024 * 1024; // 500MB default

// Job runner stores via:
const stored = await storage.putFile({
  companyId,
  namespace: "generated",   // bypasses upload limit — this is not from multer
  originalFilename: outputFilename,
  contentType,
  body: outputBuffer,       // renderer output, no multer involved
});

Key insight: The upload route (assets.ts) enforces limits via multer limits: { fileSize: MAX_ATTACHMENT_BYTES }. Job runners write directly to storage.putFile() — multer is never involved. The MAX_GENERATED_ASSET_BYTES constant exists for the job runner to validate before calling putFile, but putFile itself has no byte limit.

Pattern 6: source_task_id on assets (INFRA-04)

What: Nullable column added to assets table. When to use: Every asset created by a job runner must pass sourceTaskId.

// packages/db/src/schema/assets.ts — add column:
sourceTaskId: text("source_task_id"),  // nullable, no FK — task IDs are string identifiers

// assetService.create() in server/src/services/assets.ts accepts it through
// the existing spread pattern: db.insert(assets).values({ ...data, companyId })
// since the column is nullable, no callers break.

Pattern 7: content-job-store.ts (service layer)

What: CRUD service for content_jobs table. When to use: Create and update jobs from routes + runner.

// server/src/services/content-job-store.ts
// Pattern source: server/src/services/assets.ts, heartbeat.ts
export function contentJobStore(db: Db) {
  return {
    create: (companyId: string, data: { jobType: string; input: Record<string, unknown>; sourceTaskId: string | null }) =>
      db.insert(contentJobs).values({ companyId, ...data }).returning().then((r) => r[0]),

    getById: (id: string) =>
      db.select().from(contentJobs).where(eq(contentJobs.id, id)).then((r) => r[0] ?? null),

    listByCompany: (companyId: string) =>
      db.select().from(contentJobs)
        .where(eq(contentJobs.companyId, companyId))
        .orderBy(desc(contentJobs.createdAt)),

    transition: (id: string, patch: Partial<typeof contentJobs.$inferInsert>) =>
      db.update(contentJobs).set({ ...patch, updatedAt: new Date() }).where(eq(contentJobs.id, id)),
  };
}

Anti-Patterns to Avoid

  • Awaiting render in HTTP handler: Never await renderer.run() in the route handler. Always void dispatch(job) and return 202.
  • Using multer for generated asset storage: Job runners call storage.putFile() directly; multer is only for user uploads.
  • Hardcoding status strings: Always use the typed CONTENT_JOB_STATUSES constant from shared, not raw strings.
  • Blocking SSE on DB polling: SSE endpoint subscribes to EventEmitter via subscribeCompanyLiveEvents, not a polling loop.
  • Missing source_task_id in job creation: Every job submission should pass sourceTaskId from the incoming request (even if null for now); the column prevents future orphan accumulation.

Don't Hand-Roll

Problem Don't Build Use Instead Why
In-process pub/sub for SSE Custom EventEmitter wrapper publishLiveEvent + subscribeCompanyLiveEvents from live-events.ts Already handles multi-company scoping, id sequencing
Storage path generation Custom UUID + date path builder StorageService.putFile() via service.ts Already handles namespace normalization, sha256, objectKey construction
Migration execution Custom SQL runner pnpm db:generate then pnpm db:migrate Existing drizzle-kit + custom migration runner in client.ts
HTTP route mounting Ad-hoc app.use() Follow app.ts pattern: create router fn, import in app.ts Consistent middleware application (auth, actor, etc.)
Asset DB record Custom insert assetService(db).create() Already handles the full asset shape

Common Pitfalls

Pitfall 1: Forgetting the "generated" namespace bypass logic lives in the runner, not the route

What goes wrong: Developer adds byte-limit check in the new content-job route handler (treating it like the assets upload route), rejecting large files before they're even rendered. Why it happens: The upload route pattern uses multer limits; the developer copies that pattern. How to avoid: The content-jobs route only creates the job record (tiny JSON). The runner calls storage.putFile() directly — no multer anywhere. The size check (MAX_GENERATED_ASSET_BYTES) belongs in the job runner, after rendering, before writing. Warning signs: Any import of multer in content-jobs.ts or content-job-runner.ts.

Pitfall 2: SSE connection leaking if client disconnects mid-job

What goes wrong: Client disconnects; the unsubscribe callback is never called; the EventEmitter listener accumulates. With many requests this can trigger Node's MaxListeners warning. Why it happens: SSE streams need explicit req.on("close") cleanup. How to avoid: Always register req.on("close", () => { unsubscribe(); }) in every SSE handler. See live-events-ws.ts cleanupByClient pattern. Warning signs: MaxListeners exceeded warning in server logs.

Pitfall 3: Publishing live events before the DB row is committed

What goes wrong: Browser receives content_job.done event, queries /content-jobs/:id, gets stale data (or 404 if the row hasn't flushed). Why it happens: publishLiveEvent is synchronous; the DB write is async. How to avoid: Always await db.update(...) before calling publishLiveEvent(...) in the job runner. Warning signs: Frontend shows "done" but API returns "running".

Pitfall 4: Migration numbering collision

What goes wrong: Two migrations are created with the same prefix (e.g., 0056_) and drizzle-kit fails to apply them in order. Why it happens: Parallel development or rebasing creates numbering conflicts. How to avoid: Check ls packages/db/src/migrations/ before running pnpm db:generate. The last file is currently 0055_create_push_subscriptions.sql. New migrations start at 0056_. Warning signs: pnpm db:generate creates a file with a duplicate number.

Pitfall 5: Forgetting to export from schema/index.ts and services/index.ts

What goes wrong: TypeScript compiles but runtime throws "cannot find module" or imports return undefined. Why it happens: The project uses explicit barrel exports; tree-shaking won't auto-discover. How to avoid: After adding content_jobs.ts schema and content-job-store.ts service, immediately add exports to packages/db/src/schema/index.ts and server/src/services/index.ts.

Pitfall 6: Adding content_job.* event types in the wrong place

What goes wrong: publishLiveEvent({ type: "content_job.running" }) throws a TypeScript error because the type is not in LIVE_EVENT_TYPES. Why it happens: LiveEventType is derived from LIVE_EVENT_TYPES as const in packages/shared/src/constants.ts. How to avoid: Add the four new types (content_job.queued, content_job.running, content_job.done, content_job.failed) to the LIVE_EVENT_TYPES array in constants.ts before writing the runner.


Code Examples

Submitting a Job and Returning 202

// Source: voice.ts (sync return), assets.ts (storage pattern) — combined
router.post("/companies/:companyId/content-jobs", async (req, res) => {
  const companyId = req.params.companyId;
  assertCompanyAccess(req, companyId);

  const { jobType, input, sourceTaskId } = req.body as {
    jobType: string;
    input?: Record<string, unknown>;
    sourceTaskId?: string;
  };

  const store = contentJobStore(db);
  const job = await store.create(companyId, {
    jobType,
    input: input ?? {},
    sourceTaskId: sourceTaskId ?? null,
  });

  void contentJobRunner.dispatch(db, storage, job);  // fire and forget

  res.status(202).json({
    jobId: job.id,
    status: job.status,
    createdAt: job.createdAt,
  });
});

Publishing Progress from the Runner

// Source: live-events.ts publishLiveEvent pattern
async function runJob(db: Db, storage: StorageService, job: ContentJob) {
  const store = contentJobStore(db);

  // Transition to running
  await store.transition(job.id, { status: "running", startedAt: new Date() });
  publishLiveEvent({
    companyId: job.companyId,
    type: "content_job.running",
    payload: { jobId: job.id },
  });

  try {
    const result = await renderContent(job.jobType, job.input);

    // Store asset
    const stored = await storage.putFile({
      companyId: job.companyId,
      namespace: "generated",
      originalFilename: result.filename,
      contentType: result.contentType,
      body: result.buffer,
    });
    const asset = await assetService(db).create(job.companyId, {
      ...stored,
      sourceTaskId: job.sourceTaskId,
      createdByAgentId: null,
      createdByUserId: null,
    });

    // Transition to done
    await store.transition(job.id, { status: "done", resultAssetId: asset.id, finishedAt: new Date() });
    publishLiveEvent({
      companyId: job.companyId,
      type: "content_job.done",
      payload: { jobId: job.id, assetId: asset.id },
    });
  } catch (err) {
    const errorMessage = err instanceof Error ? err.message : "Unknown error";
    await store.transition(job.id, { status: "failed", errorMessage, finishedAt: new Date() });
    publishLiveEvent({
      companyId: job.companyId,
      type: "content_job.failed",
      payload: { jobId: job.id, errorMessage },
    });
  }
}

Migration for content_jobs table

-- packages/db/src/migrations/0056_create_content_jobs.sql
-- (generated by drizzle-kit; shown for reference)
CREATE TABLE IF NOT EXISTS "content_jobs" (
  "id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL,
  "company_id" uuid NOT NULL REFERENCES "companies"("id"),
  "job_type" text NOT NULL,
  "status" text NOT NULL DEFAULT 'queued',
  "input" jsonb NOT NULL DEFAULT '{}',
  "result_asset_id" uuid,
  "error_message" text,
  "source_task_id" text,
  "started_at" timestamp with time zone,
  "finished_at" timestamp with time zone,
  "created_at" timestamp with time zone DEFAULT now() NOT NULL,
  "updated_at" timestamp with time zone DEFAULT now() NOT NULL
);
CREATE INDEX IF NOT EXISTS "content_jobs_company_status_idx" ON "content_jobs" ("company_id", "status");
CREATE INDEX IF NOT EXISTS "content_jobs_company_created_idx" ON "content_jobs" ("company_id", "created_at");

Migration for source_task_id on assets

-- packages/db/src/migrations/0057_assets_source_task_id.sql
ALTER TABLE "assets" ADD COLUMN IF NOT EXISTS "source_task_id" text;
CREATE INDEX IF NOT EXISTS "assets_source_task_id_idx" ON "assets" ("source_task_id");

State of the Art

Old Approach Current Approach When Changed Impact
Polling for job status SSE push Existing pattern in codebase No polling loop needed on client
Blocking HTTP for render HTTP 202 + async Decision from STATE.md Render time decoupled from response time
Flat asset storage Namespaced storage (generated/, assets/general) Existing service.ts pattern No path collision between user uploads and generated output

Already established in project:

  • All DB schemas use Drizzle ORM with explicit migrations (not drizzle.push)
  • Migration files are numbered sequentially starting at 0000_; next is 0056_
  • Services follow a simple factory function pattern: export function xService(db: Db) { return { ... } }
  • Routes follow export function xRoutes(db, deps...) { const router = Router(); ... return router; } pattern
  • All routes are mounted in server/src/app.ts

Environment Availability

Step 2.6: SKIPPED — Phase 40 is purely code and database changes. No external CLI tools, external services, or runtime binaries are required beyond Node.js 20, pnpm 9, and PostgreSQL already running.


Validation Architecture

Test Framework

Property Value
Framework vitest (project dep)
Config file server/vitest.config.tsenvironment: "node"
Quick run command pnpm test:run --project server -- --reporter=verbose src/__tests__/content-jobs*
Full suite command pnpm test:run

Phase Requirements → Test Map

Req ID Behavior Test Type Automated Command File Exists?
INFRA-01 POST /content-jobs returns 202 + jobId within 200ms unit pnpm --filter @paperclipai/server test:run -- src/__tests__/content-jobs-routes.test.ts Wave 0
INFRA-01 Job transitions queued → running → done/failed unit same file Wave 0
INFRA-02 SSE endpoint delivers progress events before terminal unit pnpm --filter @paperclipai/server test:run -- src/__tests__/content-jobs-sse.test.ts Wave 0
INFRA-03 storage.putFile with generated/ namespace stores bytes without size error unit pnpm --filter @paperclipai/server test:run -- src/__tests__/content-jobs-storage.test.ts Wave 0
INFRA-04 Asset created by job runner includes sourceTaskId unit covered in content-jobs-routes.test.ts Wave 0

Sampling Rate

  • Per task commit: pnpm --filter @paperclipai/server test:run -- src/__tests__/content-jobs*.test.ts
  • Per wave merge: pnpm test:run
  • Phase gate: Full suite green before /gsd:verify-work

Wave 0 Gaps

  • server/src/__tests__/content-jobs-routes.test.ts — covers INFRA-01, INFRA-04
  • server/src/__tests__/content-jobs-sse.test.ts — covers INFRA-02
  • server/src/__tests__/content-jobs-storage.test.ts — covers INFRA-03

(No new test framework needed — vitest already configured for server.)


Project Constraints (from CLAUDE.md)

CLAUDE.md does not exist at the project root. The constraints below are derived from STATE.md key decisions which carry the same authority:

  1. Async job pattern is mandatory — all render requests return 202 + job ID immediately; never block HTTP on render
  2. content_jobs table must exist before any renderer is built — this phase is the hard dependency for all others (phases 4145)
  3. sourceTaskId required on every generated asset from day one — prevents SSD orphan accumulation
  4. MAX_GENERATED_ASSET_BYTES constant bypasses the 10MB upload limit for generated/ namespace — separate from upload route
  5. Async patternrenderPipelineService stub must exist by end of phase (even as no-op) so phase 41 can extend it

Sources

Primary (HIGH confidence)

  • Codebase: server/src/services/live-events.ts — EventEmitter pub/sub pattern for SSE fan-out
  • Codebase: server/src/routes/voice.ts — SSE headers pattern (text/event-stream, flushHeaders, res.write)
  • Codebase: packages/db/src/schema/plugin_jobs.ts — job lifecycle table pattern (status, timestamps, logs)
  • Codebase: packages/db/src/schema/assets.ts — asset table shape for INFRA-04 extension
  • Codebase: server/src/storage/service.tsputFile has no byte limit; limit is in multer (upload route only)
  • Codebase: packages/shared/src/constants.tsLIVE_EVENT_TYPES pattern to extend for job events
  • Codebase: server/src/attachment-types.tsMAX_ATTACHMENT_BYTES = 10MB; MAX_GENERATED_ASSET_BYTES to be added here
  • Codebase: packages/db/src/migrations/ — last migration is 0055_; next is 0056_
  • Project STATE.md — locked architecture decisions

Secondary (MEDIUM confidence)

  • Pattern inference from heartbeat_runs / plugin_job_runs tables (same repo) for content_jobs shape

Tertiary (LOW confidence)

None — all findings are from direct codebase inspection.


Metadata

Confidence breakdown:

  • Standard stack: HIGH — all verified from codebase
  • Architecture: HIGH — directly modeled on existing patterns in same repo
  • Pitfalls: HIGH — identified from direct code review of existing patterns

Research date: 2026-04-04 Valid until: 2026-05-04 (stable internal patterns)