nexus/.planning/phases/40-job-infrastructure/40-01-PLAN.md

18 KiB

phase plan type wave depends_on files_modified autonomous requirements must_haves
40-job-infrastructure 01 execute 1
packages/db/src/schema/content_jobs.ts
packages/db/src/schema/assets.ts
packages/db/src/schema/index.ts
packages/shared/src/constants.ts
server/src/attachment-types.ts
server/src/services/content-job-store.ts
server/src/services/content-job-runner.ts
server/src/services/index.ts
true
INFRA-01
INFRA-03
INFRA-04
truths artifacts key_links
content_jobs table exists in DB with queued/running/done/failed lifecycle columns
assets table has a source_task_id column for conversation linkage
LIVE_EVENT_TYPES includes content_job.queued, content_job.running, content_job.done, content_job.failed
MAX_GENERATED_ASSET_BYTES constant exists and defaults to 500MB
contentJobStore service can create, get, list, and transition jobs
contentJobRunner dispatches a job asynchronously without blocking, transitions through lifecycle, stores asset, publishes live events
path provides exports
packages/db/src/schema/content_jobs.ts content_jobs table definition with status lifecycle
contentJobs
CONTENT_JOB_STATUSES
path provides exports
server/src/services/content-job-store.ts CRUD service for content_jobs table
contentJobStore
path provides exports
server/src/services/content-job-runner.ts Async job dispatcher with live event publishing
contentJobRunner
from to via pattern
server/src/services/content-job-runner.ts server/src/services/content-job-store.ts store.transition() calls store.transition
from to via pattern
server/src/services/content-job-runner.ts server/src/services/live-events.ts publishLiveEvent for content_job.* events publishLiveEvent.*content_job
from to via pattern
server/src/services/content-job-runner.ts server/src/services/assets.ts assetService.create with sourceTaskId assetService.*create.*sourceTaskId
Create the content_jobs DB schema, shared constants, storage constant, and service layer (store + runner) that form the foundation for all async content generation in v1.7.

Purpose: Phase 40 is the hard dependency for phases 41-45. This plan establishes the data model and async execution engine so Plan 02 can wire HTTP routes on top. Output: Schema file, two migrations, updated constants, contentJobStore service, contentJobRunner service.

<execution_context> @$HOME/.claude/get-shit-done/workflows/execute-plan.md @$HOME/.claude/get-shit-done/templates/summary.md </execution_context>

@.planning/PROJECT.md @.planning/ROADMAP.md @.planning/STATE.md @.planning/phases/40-job-infrastructure/40-RESEARCH.md

From packages/db/src/schema/assets.ts:

export const assets = pgTable("assets", {
  id: uuid("id").primaryKey().defaultRandom(),
  companyId: uuid("company_id").notNull().references(() => companies.id),
  provider: text("provider").notNull(),
  objectKey: text("object_key").notNull(),
  contentType: text("content_type").notNull(),
  byteSize: integer("byte_size").notNull(),
  sha256: text("sha256").notNull(),
  originalFilename: text("original_filename"),
  createdByAgentId: uuid("created_by_agent_id").references(() => agents.id),
  createdByUserId: text("created_by_user_id"),
  createdAt: timestamp("created_at", { withTimezone: true }).notNull().defaultNow(),
  updatedAt: timestamp("updated_at", { withTimezone: true }).notNull().defaultNow(),
}, (table) => ({ ... }));

From packages/shared/src/constants.ts:

export const LIVE_EVENT_TYPES = [
  "heartbeat.run.queued",
  "heartbeat.run.status",
  "heartbeat.run.event",
  "heartbeat.run.log",
  "agent.status",
  "activity.logged",
  "plugin.ui.updated",
  "plugin.worker.crashed",
  "plugin.worker.restarted",
] as const;
export type LiveEventType = (typeof LIVE_EVENT_TYPES)[number];

From server/src/attachment-types.ts:

export const MAX_ATTACHMENT_BYTES =
  Number(process.env.PAPERCLIP_ATTACHMENT_MAX_BYTES) || 10 * 1024 * 1024;

From server/src/services/live-events.ts:

export function publishLiveEvent(input: {
  companyId: string;
  type: LiveEventType;
  payload?: Record<string, unknown>;
}): LiveEvent;
export function subscribeCompanyLiveEvents(companyId: string, listener: (event: LiveEvent) => void): () => void;

From server/src/services/assets.ts:

export function assetService(db: Db) {
  return {
    create: (companyId: string, data: Omit<typeof assets.$inferInsert, "companyId">) => ...,
    getById: (id: string) => ...,
  };
}

From server/src/services/index.ts — barrel export pattern:

export { assetService } from "./assets.js";
export { publishLiveEvent, subscribeCompanyLiveEvents } from "./live-events.js";

From packages/db/src/schema/index.ts — barrel export pattern:

export { assets } from "./assets.js";
Task 1: Schema, constants, and migrations packages/db/src/schema/content_jobs.ts, packages/db/src/schema/assets.ts, packages/db/src/schema/index.ts, packages/shared/src/constants.ts, server/src/attachment-types.ts packages/db/src/schema/assets.ts, packages/db/src/schema/index.ts, packages/shared/src/constants.ts, server/src/attachment-types.ts, packages/db/src/schema/plugin_jobs.ts, packages/db/src/schema/heartbeat_runs.ts - content_jobs.ts exports contentJobs pgTable with columns: id (uuid PK defaultRandom), companyId (uuid FK companies), jobType (text), status (text default "queued"), input (jsonb default {}), resultAssetId (uuid nullable), errorMessage (text nullable), sourceTaskId (text nullable), startedAt (timestamp tz nullable), finishedAt (timestamp tz nullable), createdAt (timestamp tz defaultNow), updatedAt (timestamp tz defaultNow) - content_jobs.ts exports CONTENT_JOB_STATUSES = ["queued", "running", "done", "failed"] as const and ContentJobStatus type - content_jobs.ts defines indexes: content_jobs_company_status_idx on (companyId, status), content_jobs_company_created_idx on (companyId, createdAt) - assets.ts gains sourceTaskId: text("source_task_id") column (nullable, no FK) - LIVE_EVENT_TYPES array includes "content_job.queued", "content_job.running", "content_job.done", "content_job.failed" - MAX_GENERATED_ASSET_BYTES exported from attachment-types.ts, defaults to 500 * 1024 * 1024 1. Create `packages/db/src/schema/content_jobs.ts`: - Import pgTable, uuid, text, timestamp, jsonb, index from "drizzle-orm/pg-core" - Import companies from "./companies.js" - Export `CONTENT_JOB_STATUSES = ["queued", "running", "done", "failed"] as const` - Export `type ContentJobStatus = (typeof CONTENT_JOB_STATUSES)[number]` - Export `contentJobs` pgTable "content_jobs" with exact columns from behavior block - Add two indexes in the third argument: `content_jobs_company_status_idx` on (companyId, status), `content_jobs_company_created_idx` on (companyId, createdAt)
2. Modify `packages/db/src/schema/assets.ts`:
   - Add column after `createdByUserId`: `sourceTaskId: text("source_task_id"),` (nullable, no FK, no default)
   - No other changes

3. Modify `packages/db/src/schema/index.ts`:
   - Add line: `export { contentJobs, CONTENT_JOB_STATUSES } from "./content_jobs.js";`
   - Place it after the `assets` export line

4. Modify `packages/shared/src/constants.ts`:
   - Add four entries to LIVE_EVENT_TYPES array before the `] as const;` closing:
     `"content_job.queued",`
     `"content_job.running",`
     `"content_job.done",`
     `"content_job.failed",`

5. Modify `server/src/attachment-types.ts`:
   - Add after the existing `MAX_ATTACHMENT_BYTES` export:
     ```
     export const MAX_GENERATED_ASSET_BYTES =
       Number(process.env.PAPERCLIP_GENERATED_ASSET_MAX_BYTES) || 500 * 1024 * 1024;
     ```

6. Generate migrations:
   - Run `cd /opt/nexus && pnpm db:generate`
   - Verify two new migration files appear (0056 for content_jobs, 0057 for assets source_task_id — or combined)
   - Run `pnpm db:migrate` to apply

Anti-patterns to avoid:
- Do NOT add a FK constraint for sourceTaskId — task IDs are string identifiers, not UUID FKs
- Do NOT add resultAssetId as a FK to assets — it's populated after asset creation, circular FK causes issues
cd /opt/nexus && pnpm tsc --noEmit --project packages/db/tsconfig.json && pnpm tsc --noEmit --project packages/shared/tsconfig.json && pnpm tsc --noEmit --project server/tsconfig.json - packages/db/src/schema/content_jobs.ts contains `export const contentJobs = pgTable("content_jobs"` - packages/db/src/schema/content_jobs.ts contains `export const CONTENT_JOB_STATUSES = ["queued", "running", "done", "failed"] as const` - packages/db/src/schema/content_jobs.ts contains `sourceTaskId: text("source_task_id")` - packages/db/src/schema/content_jobs.ts contains `content_jobs_company_status_idx` - packages/db/src/schema/assets.ts contains `sourceTaskId: text("source_task_id")` - packages/db/src/schema/index.ts contains `export { contentJobs, CONTENT_JOB_STATUSES } from "./content_jobs.js"` - packages/shared/src/constants.ts contains `"content_job.queued"` - packages/shared/src/constants.ts contains `"content_job.done"` - server/src/attachment-types.ts contains `export const MAX_GENERATED_ASSET_BYTES` - server/src/attachment-types.ts contains `500 * 1024 * 1024` - Migration files exist in packages/db/src/migrations/ numbered 0056 or higher - `pnpm tsc --noEmit` passes for db, shared, and server packages content_jobs schema defined with all columns and indexes, assets table has sourceTaskId, LIVE_EVENT_TYPES extended with 4 content_job event types, MAX_GENERATED_ASSET_BYTES constant exported, migrations generated and applied, TypeScript compiles clean Task 2: contentJobStore and contentJobRunner services server/src/services/content-job-store.ts, server/src/services/content-job-runner.ts, server/src/services/index.ts server/src/services/assets.ts, server/src/services/live-events.ts, server/src/services/index.ts, server/src/storage/types.ts, packages/db/src/schema/content_jobs.ts - contentJobStore(db) returns { create, getById, listByCompany, transition } - create(companyId, { jobType, input, sourceTaskId }) inserts and returns the row - getById(id) returns job or null - listByCompany(companyId) returns jobs ordered by createdAt desc - transition(id, patch) updates the row with patch + updatedAt = new Date() - contentJobRunner.dispatch(db, storage, job) fires and forgets — never awaited by caller - Runner transitions job to "running" then to "done" or "failed" - Runner publishes content_job.running, then content_job.done or content_job.failed via publishLiveEvent - Runner always awaits DB write BEFORE publishing live event (prevents stale reads) - On "done": runner calls storage.putFile with namespace "generated", then assetService.create with sourceTaskId, then transitions to done with resultAssetId - On error: runner transitions to failed with errorMessage - Runner has a renderContent stub that returns a fixed test buffer (placeholder for phase 41+) - Runner checks MAX_GENERATED_ASSET_BYTES before calling putFile 1. Create `server/src/services/content-job-store.ts`: ```typescript import { eq, desc } from "drizzle-orm"; import type { Db } from "@paperclipai/db"; import { contentJobs } from "@paperclipai/db";
   export function contentJobStore(db: Db) {
     return {
       create: (companyId: string, data: { jobType: string; input: Record<string, unknown>; sourceTaskId: string | null }) =>
         db.insert(contentJobs).values({ companyId, ...data }).returning().then((r) => r[0]),

       getById: (id: string) =>
         db.select().from(contentJobs).where(eq(contentJobs.id, id)).then((r) => r[0] ?? null),

       listByCompany: (companyId: string) =>
         db.select().from(contentJobs)
           .where(eq(contentJobs.companyId, companyId))
           .orderBy(desc(contentJobs.createdAt)),

       transition: (id: string, patch: Partial<typeof contentJobs.$inferInsert>) =>
         db.update(contentJobs).set({ ...patch, updatedAt: new Date() }).where(eq(contentJobs.id, id)),
     };
   }
   ```

2. Create `server/src/services/content-job-runner.ts`:
   - Import publishLiveEvent from "./live-events.js"
   - Import contentJobStore from "./content-job-store.js"
   - Import assetService from "./assets.js"
   - Import MAX_GENERATED_ASSET_BYTES from "../attachment-types.js"
   - Import type { Db } from "@paperclipai/db"
   - Import type { StorageService } from "../storage/types.js"
   - Import type { contentJobs } from "@paperclipai/db"

   Define type `ContentJob = typeof contentJobs.$inferSelect`

   Export `renderContent` stub function:
   ```typescript
   export async function renderContent(
     _jobType: string,
     _input: Record<string, unknown>,
   ): Promise<{ filename: string; contentType: string; buffer: Buffer }> {
     // Stub — phases 41-45 will add real renderers keyed by jobType
     return {
       filename: "placeholder.txt",
       contentType: "text/plain",
       buffer: Buffer.from("placeholder output"),
     };
   }
   ```

   Export `contentJobRunner` object with a single `dispatch` method:
   ```typescript
   export const contentJobRunner = {
     dispatch(db: Db, storage: StorageService, job: ContentJob): void {
       void runJob(db, storage, job);
     },
   };
   ```

   Implement `async function runJob(db, storage, job)`:
   - Create store via `contentJobStore(db)`
   - Transition to "running" with startedAt: new Date(), then publishLiveEvent type "content_job.running" payload { jobId: job.id }
   - Try block:
     - Call renderContent(job.jobType, job.input as Record<string, unknown>)
     - Check result.buffer.byteLength <= MAX_GENERATED_ASSET_BYTES, throw if exceeded
     - Call storage.putFile({ companyId: job.companyId, namespace: "generated", originalFilename: result.filename, contentType: result.contentType, body: result.buffer })
     - Call assetService(db).create(job.companyId, { ...stored, sourceTaskId: job.sourceTaskId, createdByAgentId: null, createdByUserId: null })
     - Transition to "done" with resultAssetId: asset.id, finishedAt: new Date()
     - publishLiveEvent type "content_job.done" payload { jobId: job.id, assetId: asset.id }
   - Catch block:
     - Extract errorMessage from err
     - Transition to "failed" with errorMessage, finishedAt: new Date()
     - publishLiveEvent type "content_job.failed" payload { jobId: job.id, errorMessage }

3. Modify `server/src/services/index.ts`:
   - Add: `export { contentJobStore } from "./content-job-store.js";`
   - Add: `export { contentJobRunner } from "./content-job-runner.js";`

Anti-patterns to avoid:
- Do NOT await dispatch in the caller — it must be fire-and-forget (`void`)
- Do NOT publish live event before DB write completes — always `await store.transition(...)` first
- Do NOT import multer anywhere in runner code — runner writes directly via storage.putFile
cd /opt/nexus && pnpm tsc --noEmit --project server/tsconfig.json - server/src/services/content-job-store.ts contains `export function contentJobStore(db: Db)` - server/src/services/content-job-store.ts contains `db.insert(contentJobs).values(` - server/src/services/content-job-store.ts contains `transition:` - server/src/services/content-job-runner.ts contains `export const contentJobRunner` - server/src/services/content-job-runner.ts contains `void runJob(db, storage, job)` - server/src/services/content-job-runner.ts contains `publishLiveEvent(` - server/src/services/content-job-runner.ts contains `content_job.running` - server/src/services/content-job-runner.ts contains `content_job.done` - server/src/services/content-job-runner.ts contains `content_job.failed` - server/src/services/content-job-runner.ts contains `MAX_GENERATED_ASSET_BYTES` - server/src/services/content-job-runner.ts contains `namespace: "generated"` - server/src/services/content-job-runner.ts contains `sourceTaskId: job.sourceTaskId` - server/src/services/content-job-runner.ts contains `export async function renderContent` - server/src/services/index.ts contains `export { contentJobStore } from "./content-job-store.js"` - server/src/services/index.ts contains `export { contentJobRunner } from "./content-job-runner.js"` - `pnpm tsc --noEmit --project server/tsconfig.json` exits 0 contentJobStore provides create/getById/listByCompany/transition. contentJobRunner.dispatch fires and forgets, transitions through lifecycle, stores generated asset with sourceTaskId, publishes live events after DB writes. Both exported from services/index.ts. TypeScript compiles clean. - `pnpm tsc --noEmit` passes across db, shared, and server packages - content_jobs table exists in database after migration - assets table has source_task_id column after migration - All new exports resolve without import errors

<success_criteria>

  • content_jobs schema has all lifecycle columns with correct types
  • assets.sourceTaskId column exists (nullable text)
  • LIVE_EVENT_TYPES includes 4 content_job.* entries
  • MAX_GENERATED_ASSET_BYTES = 500MB default
  • contentJobStore CRUD works against DB
  • contentJobRunner dispatches async, transitions lifecycle, stores asset, publishes events
  • TypeScript compiles clean across all affected packages </success_criteria>
After completion, create `.planning/phases/40-job-infrastructure/40-01-SUMMARY.md`