465 lines
14 KiB
TypeScript
465 lines
14 KiB
TypeScript
/**
|
|
* Plugin Job Store — persistence layer for scheduled plugin jobs and their
|
|
* execution history.
|
|
*
|
|
* This service manages the `plugin_jobs` and `plugin_job_runs` tables. It is
|
|
* the server-side backing store for the `ctx.jobs` SDK surface exposed to
|
|
* plugin workers.
|
|
*
|
|
* ## Responsibilities
|
|
*
|
|
* 1. **Sync job declarations** — When a plugin is installed or started, the
|
|
* host calls `syncJobDeclarations()` to upsert the manifest's declared jobs
|
|
* into the `plugin_jobs` table. Jobs removed from the manifest are marked
|
|
* `paused` (not deleted) to preserve history.
|
|
*
|
|
* 2. **Job CRUD** — List, get, pause, and resume jobs for a given plugin.
|
|
*
|
|
* 3. **Run lifecycle** — Create job run records, update their status, and
|
|
* record results (duration, errors, logs).
|
|
*
|
|
* 4. **Next-run calculation** — After a run completes the host should call
|
|
* `updateNextRunAt()` with the next cron tick so the scheduler knows when
|
|
* to fire next.
|
|
*
|
|
* The capability check (`jobs.schedule`) is enforced upstream by the host
|
|
* client factory and manifest validator — this store trusts that the caller
|
|
* has already been authorised.
|
|
*
|
|
* @see PLUGIN_SPEC.md §17 — Scheduled Jobs
|
|
* @see PLUGIN_SPEC.md §21.3 — `plugin_jobs` / `plugin_job_runs` tables
|
|
*/
|
|
|
|
import { and, desc, eq } from "drizzle-orm";
|
|
import type { Db } from "@paperclipai/db";
|
|
import { plugins, pluginJobs, pluginJobRuns } from "@paperclipai/db";
|
|
import type {
|
|
PluginJobDeclaration,
|
|
PluginJobRunStatus,
|
|
PluginJobRunTrigger,
|
|
PluginJobRecord,
|
|
} from "@paperclipai/shared";
|
|
import { notFound } from "../errors.js";
|
|
|
|
/**
|
|
* The statuses used for job *definitions* in the `plugin_jobs` table.
|
|
* Aliased from `PluginJobRecord` to keep the store API aligned with
|
|
* the domain type (`"active" | "paused" | "failed"`).
|
|
*/
|
|
type JobDefinitionStatus = PluginJobRecord["status"];
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Types
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/**
|
|
* Input for creating a job run record.
|
|
*/
|
|
export interface CreateJobRunInput {
|
|
/** FK to the plugin_jobs row. */
|
|
jobId: string;
|
|
/** FK to the plugins row. */
|
|
pluginId: string;
|
|
/** What triggered this run. */
|
|
trigger: PluginJobRunTrigger;
|
|
}
|
|
|
|
/**
|
|
* Input for completing (or failing) a job run.
|
|
*/
|
|
export interface CompleteJobRunInput {
|
|
/** Final run status. */
|
|
status: PluginJobRunStatus;
|
|
/** Error message if the run failed. */
|
|
error?: string | null;
|
|
/** Run duration in milliseconds. */
|
|
durationMs?: number | null;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Service
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/**
|
|
* Create a PluginJobStore backed by the given Drizzle database instance.
|
|
*
|
|
* @example
|
|
* ```ts
|
|
* const jobStore = pluginJobStore(db);
|
|
*
|
|
* // On plugin install/start — sync declared jobs into the DB
|
|
* await jobStore.syncJobDeclarations(pluginId, manifest.jobs ?? []);
|
|
*
|
|
* // Before dispatching a runJob RPC — create a run record
|
|
* const run = await jobStore.createRun({ jobId, pluginId, trigger: "schedule" });
|
|
*
|
|
* // After the RPC completes — record the result
|
|
* await jobStore.completeRun(run.id, {
|
|
* status: "succeeded",
|
|
* durationMs: Date.now() - startedAt,
|
|
* });
|
|
* ```
|
|
*/
|
|
export function pluginJobStore(db: Db) {
|
|
// -----------------------------------------------------------------------
|
|
// Internal helpers
|
|
// -----------------------------------------------------------------------
|
|
|
|
async function assertPluginExists(pluginId: string): Promise<void> {
|
|
const rows = await db
|
|
.select({ id: plugins.id })
|
|
.from(plugins)
|
|
.where(eq(plugins.id, pluginId));
|
|
if (rows.length === 0) {
|
|
throw notFound(`Plugin not found: ${pluginId}`);
|
|
}
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Public API
|
|
// -----------------------------------------------------------------------
|
|
|
|
return {
|
|
// =====================================================================
|
|
// Job declarations (plugin_jobs)
|
|
// =====================================================================
|
|
|
|
/**
|
|
* Sync declared jobs from a plugin manifest into the `plugin_jobs` table.
|
|
*
|
|
* This is called at plugin install and on each worker startup so the DB
|
|
* always reflects the manifest's declared jobs:
|
|
*
|
|
* - **New jobs** are inserted with status `active`.
|
|
* - **Existing jobs** have their `schedule` updated if it changed.
|
|
* - **Removed jobs** (present in DB but absent from the manifest) are
|
|
* set to `paused` so their history is preserved.
|
|
*
|
|
* The unique constraint `(pluginId, jobKey)` is used for conflict
|
|
* resolution.
|
|
*
|
|
* @param pluginId - UUID of the owning plugin
|
|
* @param declarations - Job declarations from the plugin manifest
|
|
*/
|
|
async syncJobDeclarations(
|
|
pluginId: string,
|
|
declarations: PluginJobDeclaration[],
|
|
): Promise<void> {
|
|
await assertPluginExists(pluginId);
|
|
|
|
// Fetch existing jobs for this plugin
|
|
const existingJobs = await db
|
|
.select()
|
|
.from(pluginJobs)
|
|
.where(eq(pluginJobs.pluginId, pluginId));
|
|
|
|
const existingByKey = new Map(
|
|
existingJobs.map((j) => [j.jobKey, j]),
|
|
);
|
|
|
|
const declaredKeys = new Set<string>();
|
|
|
|
// Upsert each declared job
|
|
for (const decl of declarations) {
|
|
declaredKeys.add(decl.jobKey);
|
|
|
|
const existing = existingByKey.get(decl.jobKey);
|
|
const schedule = decl.schedule ?? "";
|
|
|
|
if (existing) {
|
|
// Update schedule if it changed; re-activate if it was paused
|
|
const updates: Record<string, unknown> = {
|
|
updatedAt: new Date(),
|
|
};
|
|
if (existing.schedule !== schedule) {
|
|
updates.schedule = schedule;
|
|
}
|
|
if (existing.status === "paused") {
|
|
updates.status = "active";
|
|
}
|
|
|
|
await db
|
|
.update(pluginJobs)
|
|
.set(updates)
|
|
.where(eq(pluginJobs.id, existing.id));
|
|
} else {
|
|
// Insert new job
|
|
await db.insert(pluginJobs).values({
|
|
pluginId,
|
|
jobKey: decl.jobKey,
|
|
schedule,
|
|
status: "active",
|
|
});
|
|
}
|
|
}
|
|
|
|
// Pause jobs that are no longer declared in the manifest
|
|
for (const existing of existingJobs) {
|
|
if (!declaredKeys.has(existing.jobKey) && existing.status !== "paused") {
|
|
await db
|
|
.update(pluginJobs)
|
|
.set({ status: "paused", updatedAt: new Date() })
|
|
.where(eq(pluginJobs.id, existing.id));
|
|
}
|
|
}
|
|
},
|
|
|
|
/**
|
|
* List all jobs for a plugin, optionally filtered by status.
|
|
*
|
|
* @param pluginId - UUID of the owning plugin
|
|
* @param status - Optional status filter
|
|
*/
|
|
async listJobs(
|
|
pluginId: string,
|
|
status?: JobDefinitionStatus,
|
|
): Promise<(typeof pluginJobs.$inferSelect)[]> {
|
|
const conditions = [eq(pluginJobs.pluginId, pluginId)];
|
|
if (status) {
|
|
conditions.push(eq(pluginJobs.status, status));
|
|
}
|
|
return db
|
|
.select()
|
|
.from(pluginJobs)
|
|
.where(and(...conditions));
|
|
},
|
|
|
|
/**
|
|
* Get a single job by its composite key `(pluginId, jobKey)`.
|
|
*
|
|
* @param pluginId - UUID of the owning plugin
|
|
* @param jobKey - Stable job identifier from the manifest
|
|
* @returns The job row, or `null` if not found
|
|
*/
|
|
async getJobByKey(
|
|
pluginId: string,
|
|
jobKey: string,
|
|
): Promise<(typeof pluginJobs.$inferSelect) | null> {
|
|
const rows = await db
|
|
.select()
|
|
.from(pluginJobs)
|
|
.where(
|
|
and(
|
|
eq(pluginJobs.pluginId, pluginId),
|
|
eq(pluginJobs.jobKey, jobKey),
|
|
),
|
|
);
|
|
return rows[0] ?? null;
|
|
},
|
|
|
|
/**
|
|
* Get a single job by its primary key (UUID).
|
|
*
|
|
* @param jobId - UUID of the job row
|
|
* @returns The job row, or `null` if not found
|
|
*/
|
|
async getJobById(
|
|
jobId: string,
|
|
): Promise<(typeof pluginJobs.$inferSelect) | null> {
|
|
const rows = await db
|
|
.select()
|
|
.from(pluginJobs)
|
|
.where(eq(pluginJobs.id, jobId));
|
|
return rows[0] ?? null;
|
|
},
|
|
|
|
/**
|
|
* Fetch a single job by ID, scoped to a specific plugin.
|
|
*
|
|
* Returns `null` if the job does not exist or does not belong to the
|
|
* given plugin — callers should treat both cases as "not found".
|
|
*/
|
|
async getJobByIdForPlugin(
|
|
pluginId: string,
|
|
jobId: string,
|
|
): Promise<(typeof pluginJobs.$inferSelect) | null> {
|
|
const rows = await db
|
|
.select()
|
|
.from(pluginJobs)
|
|
.where(and(eq(pluginJobs.id, jobId), eq(pluginJobs.pluginId, pluginId)));
|
|
return rows[0] ?? null;
|
|
},
|
|
|
|
/**
|
|
* Update a job's status.
|
|
*
|
|
* @param jobId - UUID of the job row
|
|
* @param status - New status
|
|
*/
|
|
async updateJobStatus(
|
|
jobId: string,
|
|
status: JobDefinitionStatus,
|
|
): Promise<void> {
|
|
await db
|
|
.update(pluginJobs)
|
|
.set({ status, updatedAt: new Date() })
|
|
.where(eq(pluginJobs.id, jobId));
|
|
},
|
|
|
|
/**
|
|
* Update the `lastRunAt` and `nextRunAt` timestamps on a job.
|
|
*
|
|
* Called by the scheduler after a run completes to advance the
|
|
* scheduling pointer.
|
|
*
|
|
* @param jobId - UUID of the job row
|
|
* @param lastRunAt - When the last run started
|
|
* @param nextRunAt - When the next run should fire
|
|
*/
|
|
async updateRunTimestamps(
|
|
jobId: string,
|
|
lastRunAt: Date,
|
|
nextRunAt: Date | null,
|
|
): Promise<void> {
|
|
await db
|
|
.update(pluginJobs)
|
|
.set({
|
|
lastRunAt,
|
|
nextRunAt,
|
|
updatedAt: new Date(),
|
|
})
|
|
.where(eq(pluginJobs.id, jobId));
|
|
},
|
|
|
|
/**
|
|
* Delete all jobs (and cascaded runs) owned by a plugin.
|
|
*
|
|
* Called during plugin uninstall when `removeData = true`.
|
|
*
|
|
* @param pluginId - UUID of the owning plugin
|
|
*/
|
|
async deleteAllJobs(pluginId: string): Promise<void> {
|
|
await db
|
|
.delete(pluginJobs)
|
|
.where(eq(pluginJobs.pluginId, pluginId));
|
|
},
|
|
|
|
// =====================================================================
|
|
// Job runs (plugin_job_runs)
|
|
// =====================================================================
|
|
|
|
/**
|
|
* Create a new job run record with status `queued`.
|
|
*
|
|
* The caller should create the run record *before* dispatching the
|
|
* `runJob` RPC to the worker, then update it to `running` once the
|
|
* worker begins execution.
|
|
*
|
|
* @param input - Job run input (jobId, pluginId, trigger)
|
|
* @returns The newly created run row
|
|
*/
|
|
async createRun(
|
|
input: CreateJobRunInput,
|
|
): Promise<typeof pluginJobRuns.$inferSelect> {
|
|
const rows = await db
|
|
.insert(pluginJobRuns)
|
|
.values({
|
|
jobId: input.jobId,
|
|
pluginId: input.pluginId,
|
|
trigger: input.trigger,
|
|
status: "queued",
|
|
})
|
|
.returning();
|
|
|
|
return rows[0]!;
|
|
},
|
|
|
|
/**
|
|
* Mark a run as `running` and set its `startedAt` timestamp.
|
|
*
|
|
* @param runId - UUID of the run row
|
|
*/
|
|
async markRunning(runId: string): Promise<void> {
|
|
await db
|
|
.update(pluginJobRuns)
|
|
.set({
|
|
status: "running" as PluginJobRunStatus,
|
|
startedAt: new Date(),
|
|
})
|
|
.where(eq(pluginJobRuns.id, runId));
|
|
},
|
|
|
|
/**
|
|
* Complete a run — set its final status, error, duration, and
|
|
* `finishedAt` timestamp.
|
|
*
|
|
* @param runId - UUID of the run row
|
|
* @param input - Completion details
|
|
*/
|
|
async completeRun(
|
|
runId: string,
|
|
input: CompleteJobRunInput,
|
|
): Promise<void> {
|
|
await db
|
|
.update(pluginJobRuns)
|
|
.set({
|
|
status: input.status,
|
|
error: input.error ?? null,
|
|
durationMs: input.durationMs ?? null,
|
|
finishedAt: new Date(),
|
|
})
|
|
.where(eq(pluginJobRuns.id, runId));
|
|
},
|
|
|
|
/**
|
|
* Get a run by its primary key.
|
|
*
|
|
* @param runId - UUID of the run row
|
|
* @returns The run row, or `null` if not found
|
|
*/
|
|
async getRunById(
|
|
runId: string,
|
|
): Promise<(typeof pluginJobRuns.$inferSelect) | null> {
|
|
const rows = await db
|
|
.select()
|
|
.from(pluginJobRuns)
|
|
.where(eq(pluginJobRuns.id, runId));
|
|
return rows[0] ?? null;
|
|
},
|
|
|
|
/**
|
|
* List runs for a specific job, ordered by creation time descending.
|
|
*
|
|
* @param jobId - UUID of the job
|
|
* @param limit - Maximum number of rows to return (default: 50)
|
|
*/
|
|
async listRunsByJob(
|
|
jobId: string,
|
|
limit = 50,
|
|
): Promise<(typeof pluginJobRuns.$inferSelect)[]> {
|
|
return db
|
|
.select()
|
|
.from(pluginJobRuns)
|
|
.where(eq(pluginJobRuns.jobId, jobId))
|
|
.orderBy(desc(pluginJobRuns.createdAt))
|
|
.limit(limit);
|
|
},
|
|
|
|
/**
|
|
* List runs for a plugin, optionally filtered by status.
|
|
*
|
|
* @param pluginId - UUID of the owning plugin
|
|
* @param status - Optional status filter
|
|
* @param limit - Maximum number of rows to return (default: 50)
|
|
*/
|
|
async listRunsByPlugin(
|
|
pluginId: string,
|
|
status?: PluginJobRunStatus,
|
|
limit = 50,
|
|
): Promise<(typeof pluginJobRuns.$inferSelect)[]> {
|
|
const conditions = [eq(pluginJobRuns.pluginId, pluginId)];
|
|
if (status) {
|
|
conditions.push(eq(pluginJobRuns.status, status));
|
|
}
|
|
return db
|
|
.select()
|
|
.from(pluginJobRuns)
|
|
.where(and(...conditions))
|
|
.orderBy(desc(pluginJobRuns.createdAt))
|
|
.limit(limit);
|
|
},
|
|
};
|
|
}
|
|
|
|
/** Type alias for the return value of `pluginJobStore()`. */
|
|
export type PluginJobStore = ReturnType<typeof pluginJobStore>;
|