---
phase: 40-job-infrastructure
plan: 01
type: execute
wave: 1
depends_on: []
files_modified:
- packages/db/src/schema/content_jobs.ts
- packages/db/src/schema/assets.ts
- packages/db/src/schema/index.ts
- packages/shared/src/constants.ts
- server/src/attachment-types.ts
- server/src/services/content-job-store.ts
- server/src/services/content-job-runner.ts
- server/src/services/index.ts
autonomous: true
requirements:
- INFRA-01
- INFRA-03
- INFRA-04
must_haves:
truths:
- "content_jobs table exists in DB with queued/running/done/failed lifecycle columns"
- "assets table has a source_task_id column for conversation linkage"
- "LIVE_EVENT_TYPES includes content_job.queued, content_job.running, content_job.done, content_job.failed"
- "MAX_GENERATED_ASSET_BYTES constant exists and defaults to 500MB"
- "contentJobStore service can create, get, list, and transition jobs"
- "contentJobRunner dispatches a job asynchronously without blocking, transitions through lifecycle, stores asset, publishes live events"
artifacts:
- path: "packages/db/src/schema/content_jobs.ts"
provides: "content_jobs table definition with status lifecycle"
exports: ["contentJobs", "CONTENT_JOB_STATUSES"]
- path: "server/src/services/content-job-store.ts"
provides: "CRUD service for content_jobs table"
exports: ["contentJobStore"]
- path: "server/src/services/content-job-runner.ts"
provides: "Async job dispatcher with live event publishing"
exports: ["contentJobRunner"]
key_links:
- from: "server/src/services/content-job-runner.ts"
to: "server/src/services/content-job-store.ts"
via: "store.transition() calls"
pattern: "store\\.transition"
- from: "server/src/services/content-job-runner.ts"
to: "server/src/services/live-events.ts"
via: "publishLiveEvent for content_job.* events"
pattern: "publishLiveEvent.*content_job"
- from: "server/src/services/content-job-runner.ts"
to: "server/src/services/assets.ts"
via: "assetService.create with sourceTaskId"
pattern: "assetService.*create.*sourceTaskId"
---
Create the content_jobs DB schema, shared constants, storage constant, and service layer (store + runner) that form the foundation for all async content generation in v1.7.
Purpose: Phase 40 is the hard dependency for phases 41-45. This plan establishes the data model and async execution engine so Plan 02 can wire HTTP routes on top.
Output: Schema file, two migrations, updated constants, contentJobStore service, contentJobRunner service.
@$HOME/.claude/get-shit-done/workflows/execute-plan.md
@$HOME/.claude/get-shit-done/templates/summary.md
@.planning/PROJECT.md
@.planning/ROADMAP.md
@.planning/STATE.md
@.planning/phases/40-job-infrastructure/40-RESEARCH.md
From packages/db/src/schema/assets.ts:
```typescript
export const assets = pgTable("assets", {
id: uuid("id").primaryKey().defaultRandom(),
companyId: uuid("company_id").notNull().references(() => companies.id),
provider: text("provider").notNull(),
objectKey: text("object_key").notNull(),
contentType: text("content_type").notNull(),
byteSize: integer("byte_size").notNull(),
sha256: text("sha256").notNull(),
originalFilename: text("original_filename"),
createdByAgentId: uuid("created_by_agent_id").references(() => agents.id),
createdByUserId: text("created_by_user_id"),
createdAt: timestamp("created_at", { withTimezone: true }).notNull().defaultNow(),
updatedAt: timestamp("updated_at", { withTimezone: true }).notNull().defaultNow(),
}, (table) => ({ ... }));
```
From packages/shared/src/constants.ts:
```typescript
export const LIVE_EVENT_TYPES = [
"heartbeat.run.queued",
"heartbeat.run.status",
"heartbeat.run.event",
"heartbeat.run.log",
"agent.status",
"activity.logged",
"plugin.ui.updated",
"plugin.worker.crashed",
"plugin.worker.restarted",
] as const;
export type LiveEventType = (typeof LIVE_EVENT_TYPES)[number];
```
From server/src/attachment-types.ts:
```typescript
export const MAX_ATTACHMENT_BYTES =
Number(process.env.PAPERCLIP_ATTACHMENT_MAX_BYTES) || 10 * 1024 * 1024;
```
From server/src/services/live-events.ts:
```typescript
export function publishLiveEvent(input: {
companyId: string;
type: LiveEventType;
payload?: Record;
}): LiveEvent;
export function subscribeCompanyLiveEvents(companyId: string, listener: (event: LiveEvent) => void): () => void;
```
From server/src/services/assets.ts:
```typescript
export function assetService(db: Db) {
return {
create: (companyId: string, data: Omit) => ...,
getById: (id: string) => ...,
};
}
```
From server/src/services/index.ts — barrel export pattern:
```typescript
export { assetService } from "./assets.js";
export { publishLiveEvent, subscribeCompanyLiveEvents } from "./live-events.js";
```
From packages/db/src/schema/index.ts — barrel export pattern:
```typescript
export { assets } from "./assets.js";
```
Task 1: Schema, constants, and migrations
packages/db/src/schema/content_jobs.ts,
packages/db/src/schema/assets.ts,
packages/db/src/schema/index.ts,
packages/shared/src/constants.ts,
server/src/attachment-types.ts
packages/db/src/schema/assets.ts,
packages/db/src/schema/index.ts,
packages/shared/src/constants.ts,
server/src/attachment-types.ts,
packages/db/src/schema/plugin_jobs.ts,
packages/db/src/schema/heartbeat_runs.ts
- content_jobs.ts exports contentJobs pgTable with columns: id (uuid PK defaultRandom), companyId (uuid FK companies), jobType (text), status (text default "queued"), input (jsonb default {}), resultAssetId (uuid nullable), errorMessage (text nullable), sourceTaskId (text nullable), startedAt (timestamp tz nullable), finishedAt (timestamp tz nullable), createdAt (timestamp tz defaultNow), updatedAt (timestamp tz defaultNow)
- content_jobs.ts exports CONTENT_JOB_STATUSES = ["queued", "running", "done", "failed"] as const and ContentJobStatus type
- content_jobs.ts defines indexes: content_jobs_company_status_idx on (companyId, status), content_jobs_company_created_idx on (companyId, createdAt)
- assets.ts gains sourceTaskId: text("source_task_id") column (nullable, no FK)
- LIVE_EVENT_TYPES array includes "content_job.queued", "content_job.running", "content_job.done", "content_job.failed"
- MAX_GENERATED_ASSET_BYTES exported from attachment-types.ts, defaults to 500 * 1024 * 1024
1. Create `packages/db/src/schema/content_jobs.ts`:
- Import pgTable, uuid, text, timestamp, jsonb, index from "drizzle-orm/pg-core"
- Import companies from "./companies.js"
- Export `CONTENT_JOB_STATUSES = ["queued", "running", "done", "failed"] as const`
- Export `type ContentJobStatus = (typeof CONTENT_JOB_STATUSES)[number]`
- Export `contentJobs` pgTable "content_jobs" with exact columns from behavior block
- Add two indexes in the third argument: `content_jobs_company_status_idx` on (companyId, status), `content_jobs_company_created_idx` on (companyId, createdAt)
2. Modify `packages/db/src/schema/assets.ts`:
- Add column after `createdByUserId`: `sourceTaskId: text("source_task_id"),` (nullable, no FK, no default)
- No other changes
3. Modify `packages/db/src/schema/index.ts`:
- Add line: `export { contentJobs, CONTENT_JOB_STATUSES } from "./content_jobs.js";`
- Place it after the `assets` export line
4. Modify `packages/shared/src/constants.ts`:
- Add four entries to LIVE_EVENT_TYPES array before the `] as const;` closing:
`"content_job.queued",`
`"content_job.running",`
`"content_job.done",`
`"content_job.failed",`
5. Modify `server/src/attachment-types.ts`:
- Add after the existing `MAX_ATTACHMENT_BYTES` export:
```
export const MAX_GENERATED_ASSET_BYTES =
Number(process.env.PAPERCLIP_GENERATED_ASSET_MAX_BYTES) || 500 * 1024 * 1024;
```
6. Generate migrations:
- Run `cd /opt/nexus && pnpm db:generate`
- Verify two new migration files appear (0056 for content_jobs, 0057 for assets source_task_id — or combined)
- Run `pnpm db:migrate` to apply
Anti-patterns to avoid:
- Do NOT add a FK constraint for sourceTaskId — task IDs are string identifiers, not UUID FKs
- Do NOT add resultAssetId as a FK to assets — it's populated after asset creation, circular FK causes issues
cd /opt/nexus && pnpm tsc --noEmit --project packages/db/tsconfig.json && pnpm tsc --noEmit --project packages/shared/tsconfig.json && pnpm tsc --noEmit --project server/tsconfig.json
- packages/db/src/schema/content_jobs.ts contains `export const contentJobs = pgTable("content_jobs"`
- packages/db/src/schema/content_jobs.ts contains `export const CONTENT_JOB_STATUSES = ["queued", "running", "done", "failed"] as const`
- packages/db/src/schema/content_jobs.ts contains `sourceTaskId: text("source_task_id")`
- packages/db/src/schema/content_jobs.ts contains `content_jobs_company_status_idx`
- packages/db/src/schema/assets.ts contains `sourceTaskId: text("source_task_id")`
- packages/db/src/schema/index.ts contains `export { contentJobs, CONTENT_JOB_STATUSES } from "./content_jobs.js"`
- packages/shared/src/constants.ts contains `"content_job.queued"`
- packages/shared/src/constants.ts contains `"content_job.done"`
- server/src/attachment-types.ts contains `export const MAX_GENERATED_ASSET_BYTES`
- server/src/attachment-types.ts contains `500 * 1024 * 1024`
- Migration files exist in packages/db/src/migrations/ numbered 0056 or higher
- `pnpm tsc --noEmit` passes for db, shared, and server packages
content_jobs schema defined with all columns and indexes, assets table has sourceTaskId, LIVE_EVENT_TYPES extended with 4 content_job event types, MAX_GENERATED_ASSET_BYTES constant exported, migrations generated and applied, TypeScript compiles clean
Task 2: contentJobStore and contentJobRunner services
server/src/services/content-job-store.ts,
server/src/services/content-job-runner.ts,
server/src/services/index.ts
server/src/services/assets.ts,
server/src/services/live-events.ts,
server/src/services/index.ts,
server/src/storage/types.ts,
packages/db/src/schema/content_jobs.ts
- contentJobStore(db) returns { create, getById, listByCompany, transition }
- create(companyId, { jobType, input, sourceTaskId }) inserts and returns the row
- getById(id) returns job or null
- listByCompany(companyId) returns jobs ordered by createdAt desc
- transition(id, patch) updates the row with patch + updatedAt = new Date()
- contentJobRunner.dispatch(db, storage, job) fires and forgets — never awaited by caller
- Runner transitions job to "running" then to "done" or "failed"
- Runner publishes content_job.running, then content_job.done or content_job.failed via publishLiveEvent
- Runner always awaits DB write BEFORE publishing live event (prevents stale reads)
- On "done": runner calls storage.putFile with namespace "generated", then assetService.create with sourceTaskId, then transitions to done with resultAssetId
- On error: runner transitions to failed with errorMessage
- Runner has a renderContent stub that returns a fixed test buffer (placeholder for phase 41+)
- Runner checks MAX_GENERATED_ASSET_BYTES before calling putFile
1. Create `server/src/services/content-job-store.ts`:
```typescript
import { eq, desc } from "drizzle-orm";
import type { Db } from "@paperclipai/db";
import { contentJobs } from "@paperclipai/db";
export function contentJobStore(db: Db) {
return {
create: (companyId: string, data: { jobType: string; input: Record; sourceTaskId: string | null }) =>
db.insert(contentJobs).values({ companyId, ...data }).returning().then((r) => r[0]),
getById: (id: string) =>
db.select().from(contentJobs).where(eq(contentJobs.id, id)).then((r) => r[0] ?? null),
listByCompany: (companyId: string) =>
db.select().from(contentJobs)
.where(eq(contentJobs.companyId, companyId))
.orderBy(desc(contentJobs.createdAt)),
transition: (id: string, patch: Partial) =>
db.update(contentJobs).set({ ...patch, updatedAt: new Date() }).where(eq(contentJobs.id, id)),
};
}
```
2. Create `server/src/services/content-job-runner.ts`:
- Import publishLiveEvent from "./live-events.js"
- Import contentJobStore from "./content-job-store.js"
- Import assetService from "./assets.js"
- Import MAX_GENERATED_ASSET_BYTES from "../attachment-types.js"
- Import type { Db } from "@paperclipai/db"
- Import type { StorageService } from "../storage/types.js"
- Import type { contentJobs } from "@paperclipai/db"
Define type `ContentJob = typeof contentJobs.$inferSelect`
Export `renderContent` stub function:
```typescript
export async function renderContent(
_jobType: string,
_input: Record,
): Promise<{ filename: string; contentType: string; buffer: Buffer }> {
// Stub — phases 41-45 will add real renderers keyed by jobType
return {
filename: "placeholder.txt",
contentType: "text/plain",
buffer: Buffer.from("placeholder output"),
};
}
```
Export `contentJobRunner` object with a single `dispatch` method:
```typescript
export const contentJobRunner = {
dispatch(db: Db, storage: StorageService, job: ContentJob): void {
void runJob(db, storage, job);
},
};
```
Implement `async function runJob(db, storage, job)`:
- Create store via `contentJobStore(db)`
- Transition to "running" with startedAt: new Date(), then publishLiveEvent type "content_job.running" payload { jobId: job.id }
- Try block:
- Call renderContent(job.jobType, job.input as Record)
- Check result.buffer.byteLength <= MAX_GENERATED_ASSET_BYTES, throw if exceeded
- Call storage.putFile({ companyId: job.companyId, namespace: "generated", originalFilename: result.filename, contentType: result.contentType, body: result.buffer })
- Call assetService(db).create(job.companyId, { ...stored, sourceTaskId: job.sourceTaskId, createdByAgentId: null, createdByUserId: null })
- Transition to "done" with resultAssetId: asset.id, finishedAt: new Date()
- publishLiveEvent type "content_job.done" payload { jobId: job.id, assetId: asset.id }
- Catch block:
- Extract errorMessage from err
- Transition to "failed" with errorMessage, finishedAt: new Date()
- publishLiveEvent type "content_job.failed" payload { jobId: job.id, errorMessage }
3. Modify `server/src/services/index.ts`:
- Add: `export { contentJobStore } from "./content-job-store.js";`
- Add: `export { contentJobRunner } from "./content-job-runner.js";`
Anti-patterns to avoid:
- Do NOT await dispatch in the caller — it must be fire-and-forget (`void`)
- Do NOT publish live event before DB write completes — always `await store.transition(...)` first
- Do NOT import multer anywhere in runner code — runner writes directly via storage.putFile
cd /opt/nexus && pnpm tsc --noEmit --project server/tsconfig.json
- server/src/services/content-job-store.ts contains `export function contentJobStore(db: Db)`
- server/src/services/content-job-store.ts contains `db.insert(contentJobs).values(`
- server/src/services/content-job-store.ts contains `transition:`
- server/src/services/content-job-runner.ts contains `export const contentJobRunner`
- server/src/services/content-job-runner.ts contains `void runJob(db, storage, job)`
- server/src/services/content-job-runner.ts contains `publishLiveEvent(`
- server/src/services/content-job-runner.ts contains `content_job.running`
- server/src/services/content-job-runner.ts contains `content_job.done`
- server/src/services/content-job-runner.ts contains `content_job.failed`
- server/src/services/content-job-runner.ts contains `MAX_GENERATED_ASSET_BYTES`
- server/src/services/content-job-runner.ts contains `namespace: "generated"`
- server/src/services/content-job-runner.ts contains `sourceTaskId: job.sourceTaskId`
- server/src/services/content-job-runner.ts contains `export async function renderContent`
- server/src/services/index.ts contains `export { contentJobStore } from "./content-job-store.js"`
- server/src/services/index.ts contains `export { contentJobRunner } from "./content-job-runner.js"`
- `pnpm tsc --noEmit --project server/tsconfig.json` exits 0
contentJobStore provides create/getById/listByCompany/transition. contentJobRunner.dispatch fires and forgets, transitions through lifecycle, stores generated asset with sourceTaskId, publishes live events after DB writes. Both exported from services/index.ts. TypeScript compiles clean.
- `pnpm tsc --noEmit` passes across db, shared, and server packages
- content_jobs table exists in database after migration
- assets table has source_task_id column after migration
- All new exports resolve without import errors
- content_jobs schema has all lifecycle columns with correct types
- assets.sourceTaskId column exists (nullable text)
- LIVE_EVENT_TYPES includes 4 content_job.* entries
- MAX_GENERATED_ASSET_BYTES = 500MB default
- contentJobStore CRUD works against DB
- contentJobRunner dispatches async, transitions lifecycle, stores asset, publishes events
- TypeScript compiles clean across all affected packages