--- phase: 40-job-infrastructure plan: 01 type: execute wave: 1 depends_on: [] files_modified: - packages/db/src/schema/content_jobs.ts - packages/db/src/schema/assets.ts - packages/db/src/schema/index.ts - packages/shared/src/constants.ts - server/src/attachment-types.ts - server/src/services/content-job-store.ts - server/src/services/content-job-runner.ts - server/src/services/index.ts autonomous: true requirements: - INFRA-01 - INFRA-03 - INFRA-04 must_haves: truths: - "content_jobs table exists in DB with queued/running/done/failed lifecycle columns" - "assets table has a source_task_id column for conversation linkage" - "LIVE_EVENT_TYPES includes content_job.queued, content_job.running, content_job.done, content_job.failed" - "MAX_GENERATED_ASSET_BYTES constant exists and defaults to 500MB" - "contentJobStore service can create, get, list, and transition jobs" - "contentJobRunner dispatches a job asynchronously without blocking, transitions through lifecycle, stores asset, publishes live events" artifacts: - path: "packages/db/src/schema/content_jobs.ts" provides: "content_jobs table definition with status lifecycle" exports: ["contentJobs", "CONTENT_JOB_STATUSES"] - path: "server/src/services/content-job-store.ts" provides: "CRUD service for content_jobs table" exports: ["contentJobStore"] - path: "server/src/services/content-job-runner.ts" provides: "Async job dispatcher with live event publishing" exports: ["contentJobRunner"] key_links: - from: "server/src/services/content-job-runner.ts" to: "server/src/services/content-job-store.ts" via: "store.transition() calls" pattern: "store\\.transition" - from: "server/src/services/content-job-runner.ts" to: "server/src/services/live-events.ts" via: "publishLiveEvent for content_job.* events" pattern: "publishLiveEvent.*content_job" - from: "server/src/services/content-job-runner.ts" to: "server/src/services/assets.ts" via: "assetService.create with sourceTaskId" pattern: "assetService.*create.*sourceTaskId" --- Create the content_jobs DB schema, shared constants, storage constant, and service layer (store + runner) that form the foundation for all async content generation in v1.7. Purpose: Phase 40 is the hard dependency for phases 41-45. This plan establishes the data model and async execution engine so Plan 02 can wire HTTP routes on top. Output: Schema file, two migrations, updated constants, contentJobStore service, contentJobRunner service. @$HOME/.claude/get-shit-done/workflows/execute-plan.md @$HOME/.claude/get-shit-done/templates/summary.md @.planning/PROJECT.md @.planning/ROADMAP.md @.planning/STATE.md @.planning/phases/40-job-infrastructure/40-RESEARCH.md From packages/db/src/schema/assets.ts: ```typescript export const assets = pgTable("assets", { id: uuid("id").primaryKey().defaultRandom(), companyId: uuid("company_id").notNull().references(() => companies.id), provider: text("provider").notNull(), objectKey: text("object_key").notNull(), contentType: text("content_type").notNull(), byteSize: integer("byte_size").notNull(), sha256: text("sha256").notNull(), originalFilename: text("original_filename"), createdByAgentId: uuid("created_by_agent_id").references(() => agents.id), createdByUserId: text("created_by_user_id"), createdAt: timestamp("created_at", { withTimezone: true }).notNull().defaultNow(), updatedAt: timestamp("updated_at", { withTimezone: true }).notNull().defaultNow(), }, (table) => ({ ... })); ``` From packages/shared/src/constants.ts: ```typescript export const LIVE_EVENT_TYPES = [ "heartbeat.run.queued", "heartbeat.run.status", "heartbeat.run.event", "heartbeat.run.log", "agent.status", "activity.logged", "plugin.ui.updated", "plugin.worker.crashed", "plugin.worker.restarted", ] as const; export type LiveEventType = (typeof LIVE_EVENT_TYPES)[number]; ``` From server/src/attachment-types.ts: ```typescript export const MAX_ATTACHMENT_BYTES = Number(process.env.PAPERCLIP_ATTACHMENT_MAX_BYTES) || 10 * 1024 * 1024; ``` From server/src/services/live-events.ts: ```typescript export function publishLiveEvent(input: { companyId: string; type: LiveEventType; payload?: Record; }): LiveEvent; export function subscribeCompanyLiveEvents(companyId: string, listener: (event: LiveEvent) => void): () => void; ``` From server/src/services/assets.ts: ```typescript export function assetService(db: Db) { return { create: (companyId: string, data: Omit) => ..., getById: (id: string) => ..., }; } ``` From server/src/services/index.ts — barrel export pattern: ```typescript export { assetService } from "./assets.js"; export { publishLiveEvent, subscribeCompanyLiveEvents } from "./live-events.js"; ``` From packages/db/src/schema/index.ts — barrel export pattern: ```typescript export { assets } from "./assets.js"; ``` Task 1: Schema, constants, and migrations packages/db/src/schema/content_jobs.ts, packages/db/src/schema/assets.ts, packages/db/src/schema/index.ts, packages/shared/src/constants.ts, server/src/attachment-types.ts packages/db/src/schema/assets.ts, packages/db/src/schema/index.ts, packages/shared/src/constants.ts, server/src/attachment-types.ts, packages/db/src/schema/plugin_jobs.ts, packages/db/src/schema/heartbeat_runs.ts - content_jobs.ts exports contentJobs pgTable with columns: id (uuid PK defaultRandom), companyId (uuid FK companies), jobType (text), status (text default "queued"), input (jsonb default {}), resultAssetId (uuid nullable), errorMessage (text nullable), sourceTaskId (text nullable), startedAt (timestamp tz nullable), finishedAt (timestamp tz nullable), createdAt (timestamp tz defaultNow), updatedAt (timestamp tz defaultNow) - content_jobs.ts exports CONTENT_JOB_STATUSES = ["queued", "running", "done", "failed"] as const and ContentJobStatus type - content_jobs.ts defines indexes: content_jobs_company_status_idx on (companyId, status), content_jobs_company_created_idx on (companyId, createdAt) - assets.ts gains sourceTaskId: text("source_task_id") column (nullable, no FK) - LIVE_EVENT_TYPES array includes "content_job.queued", "content_job.running", "content_job.done", "content_job.failed" - MAX_GENERATED_ASSET_BYTES exported from attachment-types.ts, defaults to 500 * 1024 * 1024 1. Create `packages/db/src/schema/content_jobs.ts`: - Import pgTable, uuid, text, timestamp, jsonb, index from "drizzle-orm/pg-core" - Import companies from "./companies.js" - Export `CONTENT_JOB_STATUSES = ["queued", "running", "done", "failed"] as const` - Export `type ContentJobStatus = (typeof CONTENT_JOB_STATUSES)[number]` - Export `contentJobs` pgTable "content_jobs" with exact columns from behavior block - Add two indexes in the third argument: `content_jobs_company_status_idx` on (companyId, status), `content_jobs_company_created_idx` on (companyId, createdAt) 2. Modify `packages/db/src/schema/assets.ts`: - Add column after `createdByUserId`: `sourceTaskId: text("source_task_id"),` (nullable, no FK, no default) - No other changes 3. Modify `packages/db/src/schema/index.ts`: - Add line: `export { contentJobs, CONTENT_JOB_STATUSES } from "./content_jobs.js";` - Place it after the `assets` export line 4. Modify `packages/shared/src/constants.ts`: - Add four entries to LIVE_EVENT_TYPES array before the `] as const;` closing: `"content_job.queued",` `"content_job.running",` `"content_job.done",` `"content_job.failed",` 5. Modify `server/src/attachment-types.ts`: - Add after the existing `MAX_ATTACHMENT_BYTES` export: ``` export const MAX_GENERATED_ASSET_BYTES = Number(process.env.PAPERCLIP_GENERATED_ASSET_MAX_BYTES) || 500 * 1024 * 1024; ``` 6. Generate migrations: - Run `cd /opt/nexus && pnpm db:generate` - Verify two new migration files appear (0056 for content_jobs, 0057 for assets source_task_id — or combined) - Run `pnpm db:migrate` to apply Anti-patterns to avoid: - Do NOT add a FK constraint for sourceTaskId — task IDs are string identifiers, not UUID FKs - Do NOT add resultAssetId as a FK to assets — it's populated after asset creation, circular FK causes issues cd /opt/nexus && pnpm tsc --noEmit --project packages/db/tsconfig.json && pnpm tsc --noEmit --project packages/shared/tsconfig.json && pnpm tsc --noEmit --project server/tsconfig.json - packages/db/src/schema/content_jobs.ts contains `export const contentJobs = pgTable("content_jobs"` - packages/db/src/schema/content_jobs.ts contains `export const CONTENT_JOB_STATUSES = ["queued", "running", "done", "failed"] as const` - packages/db/src/schema/content_jobs.ts contains `sourceTaskId: text("source_task_id")` - packages/db/src/schema/content_jobs.ts contains `content_jobs_company_status_idx` - packages/db/src/schema/assets.ts contains `sourceTaskId: text("source_task_id")` - packages/db/src/schema/index.ts contains `export { contentJobs, CONTENT_JOB_STATUSES } from "./content_jobs.js"` - packages/shared/src/constants.ts contains `"content_job.queued"` - packages/shared/src/constants.ts contains `"content_job.done"` - server/src/attachment-types.ts contains `export const MAX_GENERATED_ASSET_BYTES` - server/src/attachment-types.ts contains `500 * 1024 * 1024` - Migration files exist in packages/db/src/migrations/ numbered 0056 or higher - `pnpm tsc --noEmit` passes for db, shared, and server packages content_jobs schema defined with all columns and indexes, assets table has sourceTaskId, LIVE_EVENT_TYPES extended with 4 content_job event types, MAX_GENERATED_ASSET_BYTES constant exported, migrations generated and applied, TypeScript compiles clean Task 2: contentJobStore and contentJobRunner services server/src/services/content-job-store.ts, server/src/services/content-job-runner.ts, server/src/services/index.ts server/src/services/assets.ts, server/src/services/live-events.ts, server/src/services/index.ts, server/src/storage/types.ts, packages/db/src/schema/content_jobs.ts - contentJobStore(db) returns { create, getById, listByCompany, transition } - create(companyId, { jobType, input, sourceTaskId }) inserts and returns the row - getById(id) returns job or null - listByCompany(companyId) returns jobs ordered by createdAt desc - transition(id, patch) updates the row with patch + updatedAt = new Date() - contentJobRunner.dispatch(db, storage, job) fires and forgets — never awaited by caller - Runner transitions job to "running" then to "done" or "failed" - Runner publishes content_job.running, then content_job.done or content_job.failed via publishLiveEvent - Runner always awaits DB write BEFORE publishing live event (prevents stale reads) - On "done": runner calls storage.putFile with namespace "generated", then assetService.create with sourceTaskId, then transitions to done with resultAssetId - On error: runner transitions to failed with errorMessage - Runner has a renderContent stub that returns a fixed test buffer (placeholder for phase 41+) - Runner checks MAX_GENERATED_ASSET_BYTES before calling putFile 1. Create `server/src/services/content-job-store.ts`: ```typescript import { eq, desc } from "drizzle-orm"; import type { Db } from "@paperclipai/db"; import { contentJobs } from "@paperclipai/db"; export function contentJobStore(db: Db) { return { create: (companyId: string, data: { jobType: string; input: Record; sourceTaskId: string | null }) => db.insert(contentJobs).values({ companyId, ...data }).returning().then((r) => r[0]), getById: (id: string) => db.select().from(contentJobs).where(eq(contentJobs.id, id)).then((r) => r[0] ?? null), listByCompany: (companyId: string) => db.select().from(contentJobs) .where(eq(contentJobs.companyId, companyId)) .orderBy(desc(contentJobs.createdAt)), transition: (id: string, patch: Partial) => db.update(contentJobs).set({ ...patch, updatedAt: new Date() }).where(eq(contentJobs.id, id)), }; } ``` 2. Create `server/src/services/content-job-runner.ts`: - Import publishLiveEvent from "./live-events.js" - Import contentJobStore from "./content-job-store.js" - Import assetService from "./assets.js" - Import MAX_GENERATED_ASSET_BYTES from "../attachment-types.js" - Import type { Db } from "@paperclipai/db" - Import type { StorageService } from "../storage/types.js" - Import type { contentJobs } from "@paperclipai/db" Define type `ContentJob = typeof contentJobs.$inferSelect` Export `renderContent` stub function: ```typescript export async function renderContent( _jobType: string, _input: Record, ): Promise<{ filename: string; contentType: string; buffer: Buffer }> { // Stub — phases 41-45 will add real renderers keyed by jobType return { filename: "placeholder.txt", contentType: "text/plain", buffer: Buffer.from("placeholder output"), }; } ``` Export `contentJobRunner` object with a single `dispatch` method: ```typescript export const contentJobRunner = { dispatch(db: Db, storage: StorageService, job: ContentJob): void { void runJob(db, storage, job); }, }; ``` Implement `async function runJob(db, storage, job)`: - Create store via `contentJobStore(db)` - Transition to "running" with startedAt: new Date(), then publishLiveEvent type "content_job.running" payload { jobId: job.id } - Try block: - Call renderContent(job.jobType, job.input as Record) - Check result.buffer.byteLength <= MAX_GENERATED_ASSET_BYTES, throw if exceeded - Call storage.putFile({ companyId: job.companyId, namespace: "generated", originalFilename: result.filename, contentType: result.contentType, body: result.buffer }) - Call assetService(db).create(job.companyId, { ...stored, sourceTaskId: job.sourceTaskId, createdByAgentId: null, createdByUserId: null }) - Transition to "done" with resultAssetId: asset.id, finishedAt: new Date() - publishLiveEvent type "content_job.done" payload { jobId: job.id, assetId: asset.id } - Catch block: - Extract errorMessage from err - Transition to "failed" with errorMessage, finishedAt: new Date() - publishLiveEvent type "content_job.failed" payload { jobId: job.id, errorMessage } 3. Modify `server/src/services/index.ts`: - Add: `export { contentJobStore } from "./content-job-store.js";` - Add: `export { contentJobRunner } from "./content-job-runner.js";` Anti-patterns to avoid: - Do NOT await dispatch in the caller — it must be fire-and-forget (`void`) - Do NOT publish live event before DB write completes — always `await store.transition(...)` first - Do NOT import multer anywhere in runner code — runner writes directly via storage.putFile cd /opt/nexus && pnpm tsc --noEmit --project server/tsconfig.json - server/src/services/content-job-store.ts contains `export function contentJobStore(db: Db)` - server/src/services/content-job-store.ts contains `db.insert(contentJobs).values(` - server/src/services/content-job-store.ts contains `transition:` - server/src/services/content-job-runner.ts contains `export const contentJobRunner` - server/src/services/content-job-runner.ts contains `void runJob(db, storage, job)` - server/src/services/content-job-runner.ts contains `publishLiveEvent(` - server/src/services/content-job-runner.ts contains `content_job.running` - server/src/services/content-job-runner.ts contains `content_job.done` - server/src/services/content-job-runner.ts contains `content_job.failed` - server/src/services/content-job-runner.ts contains `MAX_GENERATED_ASSET_BYTES` - server/src/services/content-job-runner.ts contains `namespace: "generated"` - server/src/services/content-job-runner.ts contains `sourceTaskId: job.sourceTaskId` - server/src/services/content-job-runner.ts contains `export async function renderContent` - server/src/services/index.ts contains `export { contentJobStore } from "./content-job-store.js"` - server/src/services/index.ts contains `export { contentJobRunner } from "./content-job-runner.js"` - `pnpm tsc --noEmit --project server/tsconfig.json` exits 0 contentJobStore provides create/getById/listByCompany/transition. contentJobRunner.dispatch fires and forgets, transitions through lifecycle, stores generated asset with sourceTaskId, publishes live events after DB writes. Both exported from services/index.ts. TypeScript compiles clean. - `pnpm tsc --noEmit` passes across db, shared, and server packages - content_jobs table exists in database after migration - assets table has source_task_id column after migration - All new exports resolve without import errors - content_jobs schema has all lifecycle columns with correct types - assets.sourceTaskId column exists (nullable text) - LIVE_EVENT_TYPES includes 4 content_job.* entries - MAX_GENERATED_ASSET_BYTES = 500MB default - contentJobStore CRUD works against DB - contentJobRunner dispatches async, transitions lifecycle, stores asset, publishes events - TypeScript compiles clean across all affected packages After completion, create `.planning/phases/40-job-infrastructure/40-01-SUMMARY.md`