import { createHmac, randomUUID } from "node:crypto"; import { eq } from "drizzle-orm"; import { afterAll, afterEach, beforeAll, describe, expect, it } from "vitest"; import { activityLog, agents, companies, companySecrets, companySecretVersions, createDb, executionWorkspaces, heartbeatRuns, instanceSettings, issues, projectWorkspaces, projects, routineRuns, routines, routineTriggers, } from "@paperclipai/db"; import { getEmbeddedPostgresTestSupport, startEmbeddedPostgresTestDatabase, } from "./helpers/embedded-postgres.js"; import { issueService } from "../services/issues.ts"; import { instanceSettingsService } from "../services/instance-settings.ts"; import { routineService } from "../services/routines.ts"; const embeddedPostgresSupport = await getEmbeddedPostgresTestSupport(); const describeEmbeddedPostgres = embeddedPostgresSupport.supported ? describe : describe.skip; if (!embeddedPostgresSupport.supported) { console.warn( `Skipping embedded Postgres routines service tests on this host: ${embeddedPostgresSupport.reason ?? "unsupported environment"}`, ); } describeEmbeddedPostgres("routine service live-execution coalescing", () => { let db!: ReturnType; let tempDb: Awaited> | null = null; beforeAll(async () => { tempDb = await startEmbeddedPostgresTestDatabase("paperclip-routines-service-"); db = createDb(tempDb.connectionString); }, 20_000); afterEach(async () => { await db.delete(activityLog); await db.delete(routineRuns); await db.delete(routineTriggers); await db.delete(routines); await db.delete(companySecretVersions); await db.delete(companySecrets); await db.delete(heartbeatRuns); await db.delete(issues); await db.delete(executionWorkspaces); await db.delete(projectWorkspaces); await db.delete(projects); await db.delete(agents); await db.delete(companies); await db.delete(instanceSettings); }); afterAll(async () => { await tempDb?.cleanup(); }); async function seedFixture(opts?: { wakeup?: ( agentId: string, wakeupOpts: { source?: string; triggerDetail?: string; reason?: string | null; payload?: Record | null; requestedByActorType?: "user" | "agent" | "system"; requestedByActorId?: string | null; contextSnapshot?: Record; }, ) => Promise; }) { const companyId = randomUUID(); const agentId = randomUUID(); const projectId = randomUUID(); const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`; const wakeups: Array<{ agentId: string; opts: { source?: string; triggerDetail?: string; reason?: string | null; payload?: Record | null; requestedByActorType?: "user" | "agent" | "system"; requestedByActorId?: string | null; contextSnapshot?: Record; }; }> = []; await db.insert(companies).values({ id: companyId, name: "Paperclip", issuePrefix, requireBoardApprovalForNewAgents: false, }); await db.insert(agents).values({ id: agentId, companyId, name: "CodexCoder", role: "engineer", status: "active", adapterType: "codex_local", adapterConfig: {}, runtimeConfig: {}, permissions: {}, }); await db.insert(projects).values({ id: projectId, companyId, name: "Routines", status: "in_progress", }); const svc = routineService(db, { heartbeat: { wakeup: async (wakeupAgentId, wakeupOpts) => { wakeups.push({ agentId: wakeupAgentId, opts: wakeupOpts }); if (opts?.wakeup) return opts.wakeup(wakeupAgentId, wakeupOpts); const issueId = (typeof wakeupOpts.payload?.issueId === "string" && wakeupOpts.payload.issueId) || (typeof wakeupOpts.contextSnapshot?.issueId === "string" && wakeupOpts.contextSnapshot.issueId) || null; if (!issueId) return null; const queuedRunId = randomUUID(); await db.insert(heartbeatRuns).values({ id: queuedRunId, companyId, agentId: wakeupAgentId, invocationSource: wakeupOpts.source ?? "assignment", triggerDetail: wakeupOpts.triggerDetail ?? null, status: "queued", contextSnapshot: { ...(wakeupOpts.contextSnapshot ?? {}), issueId }, }); await db .update(issues) .set({ executionRunId: queuedRunId, executionLockedAt: new Date(), }) .where(eq(issues.id, issueId)); return { id: queuedRunId }; }, }, }); const issueSvc = issueService(db); const routine = await svc.create( companyId, { projectId, goalId: null, parentIssueId: null, title: "ascii frog", description: "Run the frog routine", assigneeAgentId: agentId, priority: "medium", status: "active", concurrencyPolicy: "coalesce_if_active", catchUpPolicy: "skip_missed", }, {}, ); return { companyId, agentId, issueSvc, projectId, routine, svc, wakeups }; } it("creates a fresh execution issue when the previous routine issue is open but idle", async () => { const { companyId, issueSvc, routine, svc } = await seedFixture(); const previousRunId = randomUUID(); const previousIssue = await issueSvc.create(companyId, { projectId: routine.projectId, title: routine.title, description: routine.description, status: "todo", priority: routine.priority, assigneeAgentId: routine.assigneeAgentId, originKind: "routine_execution", originId: routine.id, originRunId: previousRunId, }); await db.insert(routineRuns).values({ id: previousRunId, companyId, routineId: routine.id, triggerId: null, source: "manual", status: "issue_created", triggeredAt: new Date("2026-03-20T12:00:00.000Z"), linkedIssueId: previousIssue.id, completedAt: new Date("2026-03-20T12:00:00.000Z"), }); const detailBefore = await svc.getDetail(routine.id); expect(detailBefore?.activeIssue).toBeNull(); const run = await svc.runRoutine(routine.id, { source: "manual" }); expect(run.status).toBe("issue_created"); expect(run.linkedIssueId).not.toBe(previousIssue.id); const routineIssues = await db .select({ id: issues.id, originRunId: issues.originRunId, }) .from(issues) .where(eq(issues.originId, routine.id)); expect(routineIssues).toHaveLength(2); expect(routineIssues.map((issue) => issue.id)).toContain(previousIssue.id); expect(routineIssues.map((issue) => issue.id)).toContain(run.linkedIssueId); }); it("wakes the assignee when a routine creates a fresh execution issue", async () => { const { agentId, routine, svc, wakeups } = await seedFixture(); const run = await svc.runRoutine(routine.id, { source: "manual" }); expect(run.status).toBe("issue_created"); expect(run.linkedIssueId).toBeTruthy(); expect(wakeups).toEqual([ { agentId, opts: { source: "assignment", triggerDetail: "system", reason: "issue_assigned", payload: { issueId: run.linkedIssueId, mutation: "create" }, requestedByActorType: undefined, requestedByActorId: null, contextSnapshot: { issueId: run.linkedIssueId, source: "routine.dispatch" }, }, }, ]); }); it("waits for the assignee wakeup to be queued before returning the routine run", async () => { let wakeupResolved = false; const { routine, svc } = await seedFixture({ wakeup: async () => { await new Promise((resolve) => setTimeout(resolve, 10)); wakeupResolved = true; return null; }, }); const run = await svc.runRoutine(routine.id, { source: "manual" }); expect(run.status).toBe("issue_created"); expect(wakeupResolved).toBe(true); }); it("coalesces only when the existing routine issue has a live execution run", async () => { const { agentId, companyId, issueSvc, routine, svc } = await seedFixture(); const previousRunId = randomUUID(); const liveHeartbeatRunId = randomUUID(); const previousIssue = await issueSvc.create(companyId, { projectId: routine.projectId, title: routine.title, description: routine.description, status: "in_progress", priority: routine.priority, assigneeAgentId: routine.assigneeAgentId, originKind: "routine_execution", originId: routine.id, originRunId: previousRunId, }); await db.insert(routineRuns).values({ id: previousRunId, companyId, routineId: routine.id, triggerId: null, source: "manual", status: "issue_created", triggeredAt: new Date("2026-03-20T12:00:00.000Z"), linkedIssueId: previousIssue.id, }); await db.insert(heartbeatRuns).values({ id: liveHeartbeatRunId, companyId, agentId, invocationSource: "assignment", triggerDetail: "system", status: "running", contextSnapshot: { issueId: previousIssue.id }, startedAt: new Date("2026-03-20T12:01:00.000Z"), }); await db .update(issues) .set({ checkoutRunId: liveHeartbeatRunId, executionRunId: liveHeartbeatRunId, executionLockedAt: new Date("2026-03-20T12:01:00.000Z"), }) .where(eq(issues.id, previousIssue.id)); const detailBefore = await svc.getDetail(routine.id); expect(detailBefore?.activeIssue?.id).toBe(previousIssue.id); const run = await svc.runRoutine(routine.id, { source: "manual" }); expect(run.status).toBe("coalesced"); expect(run.linkedIssueId).toBe(previousIssue.id); expect(run.coalescedIntoRunId).toBe(previousRunId); const routineIssues = await db .select({ id: issues.id }) .from(issues) .where(eq(issues.originId, routine.id)); expect(routineIssues).toHaveLength(1); expect(routineIssues[0]?.id).toBe(previousIssue.id); }); it("interpolates routine variables into the execution issue and stores resolved values", async () => { const { companyId, agentId, projectId, svc } = await seedFixture(); const variableRoutine = await svc.create( companyId, { projectId, goalId: null, parentIssueId: null, title: "repo triage", description: "Review {{repo}} for {{priority}} bugs", assigneeAgentId: agentId, priority: "medium", status: "active", concurrencyPolicy: "coalesce_if_active", catchUpPolicy: "skip_missed", variables: [ { name: "repo", label: null, type: "text", defaultValue: null, required: true, options: [] }, { name: "priority", label: null, type: "select", defaultValue: "high", required: true, options: ["high", "low"] }, ], }, {}, ); const run = await svc.runRoutine(variableRoutine.id, { source: "manual", variables: { repo: "paperclip" }, }); const storedIssue = await db .select({ description: issues.description }) .from(issues) .where(eq(issues.id, run.linkedIssueId!)) .then((rows) => rows[0] ?? null); const storedRun = await db .select({ triggerPayload: routineRuns.triggerPayload }) .from(routineRuns) .where(eq(routineRuns.id, run.id)) .then((rows) => rows[0] ?? null); expect(storedIssue?.description).toBe("Review paperclip for high bugs"); expect(storedRun?.triggerPayload).toEqual({ variables: { repo: "paperclip", priority: "high", }, }); }); it("attaches the selected execution workspace to manually triggered routine issues", async () => { const { companyId, projectId, routine, svc } = await seedFixture(); const projectWorkspaceId = randomUUID(); const executionWorkspaceId = randomUUID(); await instanceSettingsService(db).updateExperimental({ enableIsolatedWorkspaces: true }); await db .update(projects) .set({ executionWorkspacePolicy: { enabled: true, defaultMode: "shared_workspace", defaultProjectWorkspaceId: projectWorkspaceId, }, }) .where(eq(projects.id, projectId)); await db.insert(projectWorkspaces).values({ id: projectWorkspaceId, companyId, projectId, name: "Primary workspace", isPrimary: true, sharedWorkspaceKey: "routine-primary", }); await db.insert(executionWorkspaces).values({ id: executionWorkspaceId, companyId, projectId, projectWorkspaceId, mode: "isolated_workspace", strategyType: "git_worktree", name: "Routine worktree", status: "active", providerType: "git_worktree", }); const run = await svc.runRoutine(routine.id, { source: "manual", executionWorkspaceId, executionWorkspacePreference: "reuse_existing", executionWorkspaceSettings: { mode: "isolated_workspace" }, }); const storedIssue = await db .select({ projectWorkspaceId: issues.projectWorkspaceId, executionWorkspaceId: issues.executionWorkspaceId, executionWorkspacePreference: issues.executionWorkspacePreference, executionWorkspaceSettings: issues.executionWorkspaceSettings, }) .from(issues) .where(eq(issues.id, run.linkedIssueId!)) .then((rows) => rows[0] ?? null); expect(storedIssue).toEqual({ projectWorkspaceId, executionWorkspaceId, executionWorkspacePreference: "reuse_existing", executionWorkspaceSettings: { mode: "isolated_workspace" }, }); }); it("blocks schedule triggers when required variables do not have defaults", async () => { const { companyId, agentId, projectId, svc } = await seedFixture(); const variableRoutine = await svc.create( companyId, { projectId, goalId: null, parentIssueId: null, title: "repo triage", description: "Review {{repo}}", assigneeAgentId: agentId, priority: "medium", status: "active", concurrencyPolicy: "coalesce_if_active", catchUpPolicy: "skip_missed", variables: [ { name: "repo", label: null, type: "text", defaultValue: null, required: true, options: [] }, ], }, {}, ); await expect( svc.createTrigger(variableRoutine.id, { kind: "schedule", label: "daily", cronExpression: "0 10 * * *", timezone: "UTC", }, {}), ).rejects.toThrow(/require defaults for required variables/i); }); it("treats malformed stored defaults as missing when validating schedule triggers", async () => { const { companyId, agentId, projectId, svc } = await seedFixture(); const variableRoutine = await svc.create( companyId, { projectId, goalId: null, parentIssueId: null, title: "ship check", description: "Review {{approved}}", assigneeAgentId: agentId, priority: "medium", status: "active", concurrencyPolicy: "coalesce_if_active", catchUpPolicy: "skip_missed", variables: [ { name: "approved", label: null, type: "boolean", defaultValue: true, required: true, options: [] }, ], }, {}, ); await db .update(routines) .set({ variables: [ { name: "approved", label: null, type: "boolean", defaultValue: "definitely", required: true, options: [], }, ], }) .where(eq(routines.id, variableRoutine.id)); await expect( svc.createTrigger(variableRoutine.id, { kind: "schedule", label: "daily", cronExpression: "0 10 * * *", timezone: "UTC", }, {}), ).rejects.toThrow(/require defaults for required variables/i); }); it("serializes concurrent dispatches until the first execution issue is linked to a queued run", async () => { const { routine, svc } = await seedFixture({ wakeup: async (wakeupAgentId, wakeupOpts) => { const issueId = (typeof wakeupOpts.payload?.issueId === "string" && wakeupOpts.payload.issueId) || (typeof wakeupOpts.contextSnapshot?.issueId === "string" && wakeupOpts.contextSnapshot.issueId) || null; await new Promise((resolve) => setTimeout(resolve, 25)); if (!issueId) return null; const queuedRunId = randomUUID(); await db.insert(heartbeatRuns).values({ id: queuedRunId, companyId: routine.companyId, agentId: wakeupAgentId, invocationSource: wakeupOpts.source ?? "assignment", triggerDetail: wakeupOpts.triggerDetail ?? null, status: "queued", contextSnapshot: { ...(wakeupOpts.contextSnapshot ?? {}), issueId }, }); await db .update(issues) .set({ executionRunId: queuedRunId, executionLockedAt: new Date(), }) .where(eq(issues.id, issueId)); return { id: queuedRunId }; }, }); const [first, second] = await Promise.all([ svc.runRoutine(routine.id, { source: "manual" }), svc.runRoutine(routine.id, { source: "manual" }), ]); expect([first.status, second.status].sort()).toEqual(["coalesced", "issue_created"]); expect(first.linkedIssueId).toBeTruthy(); expect(second.linkedIssueId).toBeTruthy(); expect(first.linkedIssueId).toBe(second.linkedIssueId); const routineIssues = await db .select({ id: issues.id }) .from(issues) .where(eq(issues.originId, routine.id)); expect(routineIssues).toHaveLength(1); }); it("fails the run and cleans up the execution issue when wakeup queueing fails", async () => { const { routine, svc } = await seedFixture({ wakeup: async () => { throw new Error("queue unavailable"); }, }); const run = await svc.runRoutine(routine.id, { source: "manual" }); expect(run.status).toBe("failed"); expect(run.failureReason).toContain("queue unavailable"); expect(run.linkedIssueId).toBeNull(); const routineIssues = await db .select({ id: issues.id }) .from(issues) .where(eq(issues.originId, routine.id)); expect(routineIssues).toHaveLength(0); }); it("accepts standard second-precision webhook timestamps for HMAC triggers", async () => { const { routine, svc } = await seedFixture(); const { trigger, secretMaterial } = await svc.createTrigger( routine.id, { kind: "webhook", signingMode: "hmac_sha256", replayWindowSec: 300, }, {}, ); expect(trigger.publicId).toBeTruthy(); expect(secretMaterial?.webhookSecret).toBeTruthy(); const payload = { ok: true }; const rawBody = Buffer.from(JSON.stringify(payload)); const timestampSeconds = String(Math.floor(Date.now() / 1000)); const signature = `sha256=${createHmac("sha256", secretMaterial!.webhookSecret) .update(`${timestampSeconds}.`) .update(rawBody) .digest("hex")}`; const run = await svc.firePublicTrigger(trigger.publicId!, { signatureHeader: signature, timestampHeader: timestampSeconds, rawBody, payload, }); expect(run.source).toBe("webhook"); expect(run.status).toBe("issue_created"); expect(run.linkedIssueId).toBeTruthy(); }); });