diff --git a/doc/DEVELOPING.md b/doc/DEVELOPING.md index 7864b90e..97f70086 100644 --- a/doc/DEVELOPING.md +++ b/doc/DEVELOPING.md @@ -39,6 +39,15 @@ This starts: `pnpm dev` runs the server in watch mode and restarts on changes from workspace packages (including adapter packages). Use `pnpm dev:once` to run without file watching. +`pnpm dev` and `pnpm dev:once` are now idempotent for the current repo and instance: if the matching Paperclip dev runner is already alive, Paperclip reports the existing process instead of starting a duplicate. + +Inspect or stop the current repo's managed dev runner: + +```sh +pnpm dev:list +pnpm dev:stop +``` + `pnpm dev:once` now tracks backend-relevant file changes and pending migrations. When the current boot is stale, the board UI shows a `Restart required` banner. You can also enable guarded auto-restart in `Instance Settings > Experimental`, which waits for queued/running local agent runs to finish before restarting the dev server. Tailscale/private-auth dev mode: diff --git a/package.json b/package.json index 749cc8d0..d7718aa3 100644 --- a/package.json +++ b/package.json @@ -3,9 +3,11 @@ "private": true, "type": "module", "scripts": { - "dev": "node scripts/dev-runner.mjs watch", - "dev:watch": "node scripts/dev-runner.mjs watch", - "dev:once": "node scripts/dev-runner.mjs dev", + "dev": "pnpm --filter @paperclipai/server exec tsx ../scripts/dev-runner.ts watch", + "dev:watch": "pnpm --filter @paperclipai/server exec tsx ../scripts/dev-runner.ts watch", + "dev:once": "pnpm --filter @paperclipai/server exec tsx ../scripts/dev-runner.ts dev", + "dev:list": "pnpm --filter @paperclipai/server exec tsx ../scripts/dev-service.ts list", + "dev:stop": "pnpm --filter @paperclipai/server exec tsx ../scripts/dev-service.ts stop", "dev:server": "pnpm --filter @paperclipai/server dev", "dev:ui": "pnpm --filter @paperclipai/ui dev", "build": "pnpm -r build", diff --git a/scripts/dev-runner.ts b/scripts/dev-runner.ts new file mode 100644 index 00000000..4ee46395 --- /dev/null +++ b/scripts/dev-runner.ts @@ -0,0 +1,656 @@ +#!/usr/bin/env -S node --import tsx +import { spawn } from "node:child_process"; +import { existsSync, mkdirSync, readdirSync, rmSync, statSync, writeFileSync } from "node:fs"; +import path from "node:path"; +import { createInterface } from "node:readline/promises"; +import { stdin, stdout } from "node:process"; +import { shouldTrackDevServerPath } from "./dev-runner-paths.mjs"; +import { createDevServiceIdentity, repoRoot } from "./dev-service-profile.ts"; +import { + findAdoptableLocalService, + removeLocalServiceRegistryRecord, + touchLocalServiceRegistryRecord, + writeLocalServiceRegistryRecord, +} from "../server/src/services/local-service-supervisor.ts"; + +const mode = process.argv[2] === "watch" ? "watch" : "dev"; +const cliArgs = process.argv.slice(3); +const scanIntervalMs = 1500; +const autoRestartPollIntervalMs = 2500; +const gracefulShutdownTimeoutMs = 10_000; +const changedPathSampleLimit = 5; +const devServerStatusFilePath = path.join(repoRoot, ".paperclip", "dev-server-status.json"); + +const watchedDirectories = [ + "cli", + "scripts", + "server", + "packages/adapter-utils", + "packages/adapters", + "packages/db", + "packages/plugins/sdk", + "packages/shared", +].map((relativePath) => path.join(repoRoot, relativePath)); + +const watchedFiles = [ + ".env", + "package.json", + "pnpm-workspace.yaml", + "tsconfig.base.json", + "tsconfig.json", + "vitest.config.ts", +].map((relativePath) => path.join(repoRoot, relativePath)); + +const ignoredDirectoryNames = new Set([ + ".git", + ".turbo", + ".vite", + "coverage", + "dist", + "node_modules", + "ui-dist", +]); + +const ignoredRelativePaths = new Set([ + ".paperclip/dev-server-status.json", +]); + +const tailscaleAuthFlagNames = new Set([ + "--tailscale-auth", + "--authenticated-private", +]); + +let tailscaleAuth = false; +const forwardedArgs: string[] = []; + +for (const arg of cliArgs) { + if (tailscaleAuthFlagNames.has(arg)) { + tailscaleAuth = true; + continue; + } + forwardedArgs.push(arg); +} + +if (process.env.npm_config_tailscale_auth === "true") { + tailscaleAuth = true; +} +if (process.env.npm_config_authenticated_private === "true") { + tailscaleAuth = true; +} + +const env: NodeJS.ProcessEnv = { + ...process.env, + PAPERCLIP_UI_DEV_MIDDLEWARE: "true", +}; + +if (mode === "dev") { + env.PAPERCLIP_DEV_SERVER_STATUS_FILE = devServerStatusFilePath; +} + +if (mode === "watch") { + env.PAPERCLIP_MIGRATION_PROMPT ??= "never"; + env.PAPERCLIP_MIGRATION_AUTO_APPLY ??= "true"; +} + +if (tailscaleAuth) { + env.PAPERCLIP_DEPLOYMENT_MODE = "authenticated"; + env.PAPERCLIP_DEPLOYMENT_EXPOSURE = "private"; + env.PAPERCLIP_AUTH_BASE_URL_MODE = "auto"; + env.HOST = "0.0.0.0"; + console.log("[paperclip] dev mode: authenticated/private (tailscale-friendly) on 0.0.0.0"); +} else { + console.log("[paperclip] dev mode: local_trusted (default)"); +} + +const serverPort = Number.parseInt(env.PORT ?? process.env.PORT ?? "3100", 10) || 3100; +const devService = createDevServiceIdentity({ + mode, + forwardedArgs, + tailscaleAuth, + port: serverPort, +}); + +const existingRunner = await findAdoptableLocalService({ + serviceKey: devService.serviceKey, + cwd: repoRoot, + envFingerprint: devService.envFingerprint, + port: serverPort, +}); +if (existingRunner) { + console.log( + `[paperclip] ${devService.serviceName} already running (pid ${existingRunner.pid}${typeof existingRunner.metadata?.childPid === "number" ? `, child ${existingRunner.metadata.childPid}` : ""})`, + ); + process.exit(0); +} + +const pnpmBin = process.platform === "win32" ? "pnpm.cmd" : "pnpm"; +let previousSnapshot = collectWatchedSnapshot(); +let dirtyPaths = new Set(); +let pendingMigrations: string[] = []; +let lastChangedAt: string | null = null; +let lastRestartAt: string | null = null; +let scanInFlight = false; +let restartInFlight = false; +let shuttingDown = false; +let childExitWasExpected = false; +let child: ReturnType | null = null; +let childExitPromise: Promise<{ code: number; signal: NodeJS.Signals | null }> | null = null; +let scanTimer: ReturnType | null = null; +let autoRestartTimer: ReturnType | null = null; + +function toError(error: unknown, context = "Dev runner command failed") { + if (error instanceof Error) return error; + if (error === undefined) return new Error(context); + if (typeof error === "string") return new Error(`${context}: ${error}`); + + try { + return new Error(`${context}: ${JSON.stringify(error)}`); + } catch { + return new Error(`${context}: ${String(error)}`); + } +} + +process.on("uncaughtException", async (error) => { + await removeLocalServiceRegistryRecord(devService.serviceKey); + const err = toError(error, "Uncaught exception in dev runner"); + process.stderr.write(`${err.stack ?? err.message}\n`); + process.exit(1); +}); + +process.on("unhandledRejection", async (reason) => { + await removeLocalServiceRegistryRecord(devService.serviceKey); + const err = toError(reason, "Unhandled promise rejection in dev runner"); + process.stderr.write(`${err.stack ?? err.message}\n`); + process.exit(1); +}); + +function formatPendingMigrationSummary(migrations: string[]) { + if (migrations.length === 0) return "none"; + return migrations.length > 3 + ? `${migrations.slice(0, 3).join(", ")} (+${migrations.length - 3} more)` + : migrations.join(", "); +} + +function exitForSignal(signal: NodeJS.Signals) { + if (signal === "SIGINT") { + process.exit(130); + } + if (signal === "SIGTERM") { + process.exit(143); + } + process.exit(1); +} + +function toRelativePath(absolutePath: string) { + return path.relative(repoRoot, absolutePath).split(path.sep).join("/"); +} + +function readSignature(absolutePath: string) { + const stats = statSync(absolutePath); + return `${Math.trunc(stats.mtimeMs)}:${stats.size}`; +} + +function addFileToSnapshot(snapshot: Map, absolutePath: string) { + const relativePath = toRelativePath(absolutePath); + if (ignoredRelativePaths.has(relativePath)) return; + if (!shouldTrackDevServerPath(relativePath)) return; + snapshot.set(relativePath, readSignature(absolutePath)); +} + +function walkDirectory(snapshot: Map, absoluteDirectory: string) { + if (!existsSync(absoluteDirectory)) return; + + for (const entry of readdirSync(absoluteDirectory, { withFileTypes: true })) { + if (ignoredDirectoryNames.has(entry.name)) continue; + + const absolutePath = path.join(absoluteDirectory, entry.name); + if (entry.isDirectory()) { + walkDirectory(snapshot, absolutePath); + continue; + } + if (entry.isFile() || entry.isSymbolicLink()) { + addFileToSnapshot(snapshot, absolutePath); + } + } +} + +function collectWatchedSnapshot() { + const snapshot = new Map(); + + for (const absoluteDirectory of watchedDirectories) { + walkDirectory(snapshot, absoluteDirectory); + } + for (const absoluteFile of watchedFiles) { + if (!existsSync(absoluteFile)) continue; + addFileToSnapshot(snapshot, absoluteFile); + } + + return snapshot; +} + +function diffSnapshots(previous: Map, next: Map) { + const changed = new Set(); + + for (const [relativePath, signature] of next) { + if (previous.get(relativePath) !== signature) { + changed.add(relativePath); + } + } + for (const relativePath of previous.keys()) { + if (!next.has(relativePath)) { + changed.add(relativePath); + } + } + + return [...changed].sort(); +} + +function ensureDevStatusDirectory() { + mkdirSync(path.dirname(devServerStatusFilePath), { recursive: true }); +} + +function writeDevServerStatus() { + if (mode !== "dev") return; + + ensureDevStatusDirectory(); + const changedPaths = [...dirtyPaths].sort(); + writeFileSync( + devServerStatusFilePath, + `${JSON.stringify({ + dirty: changedPaths.length > 0 || pendingMigrations.length > 0, + lastChangedAt, + changedPathCount: changedPaths.length, + changedPathsSample: changedPaths.slice(0, changedPathSampleLimit), + pendingMigrations, + lastRestartAt, + }, null, 2)}\n`, + "utf8", + ); +} + +function clearDevServerStatus() { + if (mode !== "dev") return; + rmSync(devServerStatusFilePath, { force: true }); +} + +async function updateDevServiceRecord(extra?: Record) { + await writeLocalServiceRegistryRecord({ + version: 1, + serviceKey: devService.serviceKey, + profileKind: "paperclip-dev", + serviceName: devService.serviceName, + command: "dev-runner.ts", + cwd: repoRoot, + envFingerprint: devService.envFingerprint, + port: serverPort, + url: `http://127.0.0.1:${serverPort}`, + pid: process.pid, + processGroupId: null, + provider: "local_process", + runtimeServiceId: null, + reuseKey: null, + startedAt: lastRestartAt ?? new Date().toISOString(), + lastSeenAt: new Date().toISOString(), + metadata: { + repoRoot, + mode, + childPid: child?.pid ?? null, + url: `http://127.0.0.1:${serverPort}`, + ...extra, + }, + }); +} + +async function runPnpm(args: string[], options: { + stdio?: "inherit" | ["ignore", "pipe", "pipe"]; + env?: NodeJS.ProcessEnv; +} = {}) { + return await new Promise<{ code: number; signal: NodeJS.Signals | null; stdout: string; stderr: string }>((resolve, reject) => { + const spawned = spawn(pnpmBin, args, { + stdio: options.stdio ?? ["ignore", "pipe", "pipe"], + env: options.env ?? process.env, + shell: process.platform === "win32", + }); + + let stdoutBuffer = ""; + let stderrBuffer = ""; + + if (spawned.stdout) { + spawned.stdout.on("data", (chunk) => { + stdoutBuffer += String(chunk); + }); + } + if (spawned.stderr) { + spawned.stderr.on("data", (chunk) => { + stderrBuffer += String(chunk); + }); + } + + spawned.on("error", reject); + spawned.on("exit", (code, signal) => { + resolve({ + code: code ?? 0, + signal, + stdout: stdoutBuffer, + stderr: stderrBuffer, + }); + }); + }); +} + +async function getMigrationStatusPayload() { + const status = await runPnpm( + ["--filter", "@paperclipai/db", "exec", "tsx", "src/migration-status.ts", "--json"], + { env }, + ); + if (status.code !== 0) { + process.stderr.write( + status.stderr || + status.stdout || + `[paperclip] Command failed with code ${status.code}: pnpm --filter @paperclipai/db exec tsx src/migration-status.ts --json\n`, + ); + process.exit(status.code); + } + + try { + return JSON.parse(status.stdout.trim()) as { status?: string; pendingMigrations?: string[] }; + } catch (error) { + process.stderr.write( + status.stderr || + status.stdout || + "[paperclip] migration-status returned invalid JSON payload\n", + ); + throw toError(error, "Unable to parse migration-status JSON output"); + } +} + +async function refreshPendingMigrations() { + const payload = await getMigrationStatusPayload(); + pendingMigrations = + payload.status === "needsMigrations" && Array.isArray(payload.pendingMigrations) + ? payload.pendingMigrations.filter((entry) => typeof entry === "string" && entry.trim().length > 0) + : []; + writeDevServerStatus(); + return payload; +} + +async function maybePreflightMigrations(options: { interactive?: boolean; autoApply?: boolean; exitOnDecline?: boolean } = {}) { + const interactive = options.interactive ?? mode === "watch"; + const autoApply = options.autoApply ?? env.PAPERCLIP_MIGRATION_AUTO_APPLY === "true"; + const exitOnDecline = options.exitOnDecline ?? mode === "watch"; + + const payload = await refreshPendingMigrations(); + if (payload.status !== "needsMigrations" || pendingMigrations.length === 0) { + return; + } + + let shouldApply = autoApply; + + if (!autoApply && interactive) { + if (!stdin.isTTY || !stdout.isTTY) { + shouldApply = true; + } else { + const prompt = createInterface({ input: stdin, output: stdout }); + try { + const answer = ( + await prompt.question( + `Apply pending migrations (${formatPendingMigrationSummary(pendingMigrations)}) now? (y/N): `, + ) + ) + .trim() + .toLowerCase(); + shouldApply = answer === "y" || answer === "yes"; + } finally { + prompt.close(); + } + } + } + + if (!shouldApply) { + if (exitOnDecline) { + process.stderr.write( + `[paperclip] Pending migrations detected (${formatPendingMigrationSummary(pendingMigrations)}). Refusing to start watch mode against a stale schema.\n`, + ); + process.exit(1); + } + return; + } + + const migrate = spawn(pnpmBin, ["db:migrate"], { + stdio: "inherit", + env, + shell: process.platform === "win32", + }); + const exit = await new Promise<{ code: number; signal: NodeJS.Signals | null }>((resolve) => { + migrate.on("exit", (code, signal) => resolve({ code: code ?? 0, signal })); + }); + if (exit.signal) { + exitForSignal(exit.signal); + return; + } + if (exit.code !== 0) { + process.exit(exit.code); + } + + await refreshPendingMigrations(); +} + +async function buildPluginSdk() { + console.log("[paperclip] building plugin sdk..."); + const result = await runPnpm( + ["--filter", "@paperclipai/plugin-sdk", "build"], + { stdio: "inherit" }, + ); + if (result.signal) { + exitForSignal(result.signal); + return; + } + if (result.code !== 0) { + console.error("[paperclip] plugin sdk build failed"); + process.exit(result.code); + } +} + +async function markChildAsCurrent() { + previousSnapshot = collectWatchedSnapshot(); + dirtyPaths = new Set(); + lastChangedAt = null; + lastRestartAt = new Date().toISOString(); + await refreshPendingMigrations(); + await updateDevServiceRecord(); +} + +async function scanForBackendChanges() { + if (mode !== "dev" || scanInFlight || restartInFlight) return; + scanInFlight = true; + try { + const nextSnapshot = collectWatchedSnapshot(); + const changed = diffSnapshots(previousSnapshot, nextSnapshot); + previousSnapshot = nextSnapshot; + if (changed.length === 0) return; + + for (const relativePath of changed) { + dirtyPaths.add(relativePath); + } + lastChangedAt = new Date().toISOString(); + await refreshPendingMigrations(); + } finally { + scanInFlight = false; + } +} + +async function getDevHealthPayload() { + const response = await fetch(`http://127.0.0.1:${serverPort}/api/health`); + if (!response.ok) { + throw new Error(`Health request failed (${response.status})`); + } + return await response.json(); +} + +async function waitForChildExit() { + if (!childExitPromise) { + return { code: 0, signal: null }; + } + return await childExitPromise; +} + +async function stopChildForRestart() { + if (!child) return { code: 0, signal: null }; + childExitWasExpected = true; + child.kill("SIGTERM"); + const killTimer = setTimeout(() => { + if (child) { + child.kill("SIGKILL"); + } + }, gracefulShutdownTimeoutMs); + try { + return await waitForChildExit(); + } finally { + clearTimeout(killTimer); + } +} + +async function startServerChild() { + await buildPluginSdk(); + + const serverScript = mode === "watch" ? "dev:watch" : "dev"; + child = spawn( + pnpmBin, + ["--filter", "@paperclipai/server", serverScript, ...forwardedArgs], + { stdio: "inherit", env, shell: process.platform === "win32" }, + ); + + childExitPromise = new Promise((resolve, reject) => { + child?.on("error", reject); + child?.on("exit", (code, signal) => { + const expected = childExitWasExpected; + childExitWasExpected = false; + child = null; + childExitPromise = null; + void touchLocalServiceRegistryRecord(devService.serviceKey, { + metadata: { + repoRoot, + mode, + childPid: null, + url: `http://127.0.0.1:${serverPort}`, + }, + }); + resolve({ code: code ?? 0, signal }); + + if (restartInFlight || expected || shuttingDown) { + return; + } + if (signal) { + exitForSignal(signal); + return; + } + process.exit(code ?? 0); + }); + }); + + await markChildAsCurrent(); +} + +async function maybeAutoRestartChild() { + if (mode !== "dev" || restartInFlight || !child) return; + if (dirtyPaths.size === 0 && pendingMigrations.length === 0) return; + + restartInFlight = true; + let health: { devServer?: { enabled?: boolean; autoRestartEnabled?: boolean; activeRunCount?: number } } | null = null; + try { + health = await getDevHealthPayload(); + } catch { + restartInFlight = false; + return; + } + + const devServer = health?.devServer; + if (!devServer?.enabled || devServer.autoRestartEnabled !== true) { + restartInFlight = false; + return; + } + if ((devServer.activeRunCount ?? 0) > 0) { + restartInFlight = false; + return; + } + + try { + await maybePreflightMigrations({ + autoApply: true, + interactive: false, + exitOnDecline: false, + }); + await stopChildForRestart(); + await startServerChild(); + } catch (error) { + const err = toError(error, "Auto-restart failed"); + process.stderr.write(`${err.stack ?? err.message}\n`); + process.exit(1); + } finally { + restartInFlight = false; + } +} + +function installDevIntervals() { + if (mode !== "dev") return; + + scanTimer = setInterval(() => { + void scanForBackendChanges(); + }, scanIntervalMs); + autoRestartTimer = setInterval(() => { + void maybeAutoRestartChild(); + }, autoRestartPollIntervalMs); +} + +function clearDevIntervals() { + if (scanTimer) { + clearInterval(scanTimer); + scanTimer = null; + } + if (autoRestartTimer) { + clearInterval(autoRestartTimer); + autoRestartTimer = null; + } +} + +async function shutdown(signal: NodeJS.Signals) { + if (shuttingDown) return; + shuttingDown = true; + clearDevIntervals(); + clearDevServerStatus(); + await removeLocalServiceRegistryRecord(devService.serviceKey); + + if (!child) { + exitForSignal(signal); + return; + } + + childExitWasExpected = true; + child.kill(signal); + const exit = await waitForChildExit(); + if (exit.signal) { + exitForSignal(exit.signal); + return; + } + process.exit(exit.code ?? 0); +} + +process.on("SIGINT", () => { + void shutdown("SIGINT"); +}); +process.on("SIGTERM", () => { + void shutdown("SIGTERM"); +}); + +await maybePreflightMigrations(); +await startServerChild(); +installDevIntervals(); + +if (mode === "watch") { + const exit = await waitForChildExit(); + await removeLocalServiceRegistryRecord(devService.serviceKey); + if (exit.signal) { + exitForSignal(exit.signal); + } + process.exit(exit.code ?? 0); +} diff --git a/scripts/dev-service-profile.ts b/scripts/dev-service-profile.ts new file mode 100644 index 00000000..9c129b34 --- /dev/null +++ b/scripts/dev-service-profile.ts @@ -0,0 +1,44 @@ +import { createHash } from "node:crypto"; +import path from "node:path"; +import { fileURLToPath } from "node:url"; +import { createLocalServiceKey } from "../server/src/services/local-service-supervisor.ts"; + +export const repoRoot = path.resolve(path.dirname(fileURLToPath(import.meta.url)), ".."); + +export function createDevServiceIdentity(input: { + mode: "watch" | "dev"; + forwardedArgs: string[]; + tailscaleAuth: boolean; + port: number; +}) { + const envFingerprint = createHash("sha256") + .update( + JSON.stringify({ + mode: input.mode, + forwardedArgs: input.forwardedArgs, + tailscaleAuth: input.tailscaleAuth, + port: input.port, + }), + ) + .digest("hex"); + + const serviceName = input.mode === "watch" ? "paperclip-dev-watch" : "paperclip-dev-once"; + const serviceKey = createLocalServiceKey({ + profileKind: "paperclip-dev", + serviceName, + cwd: repoRoot, + command: "dev-runner.ts", + envFingerprint, + port: input.port, + scope: { + repoRoot, + mode: input.mode, + }, + }); + + return { + serviceKey, + serviceName, + envFingerprint, + }; +} diff --git a/scripts/dev-service.ts b/scripts/dev-service.ts new file mode 100644 index 00000000..978607ec --- /dev/null +++ b/scripts/dev-service.ts @@ -0,0 +1,44 @@ +#!/usr/bin/env -S node --import tsx +import { listLocalServiceRegistryRecords, removeLocalServiceRegistryRecord, terminateLocalService } from "../server/src/services/local-service-supervisor.ts"; +import { repoRoot } from "./dev-service-profile.ts"; + +function toDisplayLines(records: Awaited>) { + return records.map((record) => { + const childPid = typeof record.metadata?.childPid === "number" ? ` child=${record.metadata.childPid}` : ""; + const url = typeof record.metadata?.url === "string" ? ` url=${record.metadata.url}` : ""; + return `${record.serviceName} pid=${record.pid}${childPid} cwd=${record.cwd}${url}`; + }); +} + +const command = process.argv[2] ?? "list"; +const records = await listLocalServiceRegistryRecords({ + profileKind: "paperclip-dev", + metadata: { repoRoot }, +}); + +if (command === "list") { + if (records.length === 0) { + console.log("No Paperclip dev services registered for this repo."); + process.exit(0); + } + for (const line of toDisplayLines(records)) { + console.log(line); + } + process.exit(0); +} + +if (command === "stop") { + if (records.length === 0) { + console.log("No Paperclip dev services registered for this repo."); + process.exit(0); + } + for (const record of records) { + await terminateLocalService(record); + await removeLocalServiceRegistryRecord(record.serviceKey); + console.log(`Stopped ${record.serviceName} (pid ${record.pid})`); + } + process.exit(0); +} + +console.error(`Unknown dev-service command: ${command}`); +process.exit(1); diff --git a/server/src/__tests__/workspace-runtime.test.ts b/server/src/__tests__/workspace-runtime.test.ts index 6a55a72b..92be7642 100644 --- a/server/src/__tests__/workspace-runtime.test.ts +++ b/server/src/__tests__/workspace-runtime.test.ts @@ -1,25 +1,48 @@ import { execFile } from "node:child_process"; +import { randomUUID } from "node:crypto"; import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { fileURLToPath } from "node:url"; import { promisify } from "node:util"; -import { afterEach, describe, expect, it } from "vitest"; +import { afterAll, afterEach, beforeAll, describe, expect, it } from "vitest"; +import { + agents, + companies, + createDb, + heartbeatRuns, + workspaceRuntimeServices, +} from "@paperclipai/db"; +import { eq } from "drizzle-orm"; import { cleanupExecutionWorkspaceArtifacts, ensureRuntimeServicesForRun, normalizeAdapterManagedRuntimeServices, + reconcilePersistedRuntimeServicesOnStartup, realizeExecutionWorkspace, releaseRuntimeServicesForRun, + resetRuntimeServicesForTests, stopRuntimeServicesForExecutionWorkspace, type RealizedExecutionWorkspace, } from "../services/workspace-runtime.ts"; import { resolvePaperclipConfigPath } from "../paths.ts"; import type { WorkspaceOperation } from "@paperclipai/shared"; import type { WorkspaceOperationRecorder } from "../services/workspace-operations.ts"; +import { + getEmbeddedPostgresTestSupport, + startEmbeddedPostgresTestDatabase, +} from "./helpers/embedded-postgres.js"; const execFileAsync = promisify(execFile); const leasedRunIds = new Set(); +const embeddedPostgresSupport = await getEmbeddedPostgresTestSupport(); +const describeEmbeddedPostgres = embeddedPostgresSupport.supported ? describe : describe.skip; + +if (!embeddedPostgresSupport.supported) { + console.warn( + `Skipping embedded Postgres workspace-runtime tests on this host: ${embeddedPostgresSupport.reason ?? "unsupported environment"}`, + ); +} async function runGit(cwd: string, args: string[]) { await execFileAsync("git", args, { cwd }); @@ -128,6 +151,7 @@ afterEach(async () => { delete process.env.PAPERCLIP_INSTANCE_ID; delete process.env.PAPERCLIP_WORKTREES_DIR; delete process.env.DATABASE_URL; + await resetRuntimeServicesForTests(); }); describe("realizeExecutionWorkspace", () => { @@ -1028,6 +1052,135 @@ describe("ensureRuntimeServicesForRun", () => { }); }); +describeEmbeddedPostgres("workspace runtime startup reconciliation", () => { + let db!: ReturnType; + let tempDb: Awaited> | null = null; + + beforeAll(async () => { + tempDb = await startEmbeddedPostgresTestDatabase("paperclip-workspace-runtime-"); + db = createDb(tempDb.connectionString); + }, 20_000); + + afterAll(async () => { + await tempDb?.cleanup(); + }); + + afterEach(async () => { + await db.delete(workspaceRuntimeServices); + await db.delete(heartbeatRuns); + await db.delete(agents); + await db.delete(companies); + }); + + it("adopts a live auto-port shared service after runtime state is reset", async () => { + const workspaceRoot = await fs.mkdtemp(path.join(os.tmpdir(), "paperclip-runtime-reconcile-")); + const paperclipHome = await fs.mkdtemp(path.join(os.tmpdir(), "paperclip-runtime-home-")); + process.env.PAPERCLIP_HOME = paperclipHome; + process.env.PAPERCLIP_INSTANCE_ID = `runtime-reconcile-${randomUUID()}`; + + const companyId = randomUUID(); + const agentId = randomUUID(); + const runId = randomUUID(); + const executionWorkspaceId = randomUUID(); + + await db.insert(companies).values({ + id: companyId, + name: "Paperclip", + issuePrefix: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`, + requireBoardApprovalForNewAgents: false, + }); + await db.insert(agents).values({ + id: agentId, + companyId, + name: "Codex Coder", + role: "engineer", + status: "active", + adapterType: "codex_local", + adapterConfig: {}, + runtimeConfig: {}, + permissions: {}, + }); + await db.insert(heartbeatRuns).values({ + id: runId, + companyId, + agentId, + invocationSource: "manual", + status: "running", + startedAt: new Date(), + updatedAt: new Date(), + }); + + const workspace = { + ...buildWorkspace(workspaceRoot), + projectId: null, + workspaceId: null, + }; + leasedRunIds.add(runId); + + const services = await ensureRuntimeServicesForRun({ + db, + runId, + agent: { + id: agentId, + name: "Codex Coder", + companyId, + }, + issue: null, + workspace, + config: { + workspaceRuntime: { + services: [ + { + name: "web", + command: + "node -e \"require('node:http').createServer((req,res)=>res.end('ok')).listen(Number(process.env.PORT), '127.0.0.1')\"", + port: { type: "auto" }, + readiness: { + type: "http", + urlTemplate: "http://127.0.0.1:{{port}}", + timeoutSec: 10, + intervalMs: 100, + }, + lifecycle: "shared", + reuseScope: "agent", + stopPolicy: { + type: "manual", + }, + }, + ], + }, + }, + adapterEnv: {}, + }); + + expect(services).toHaveLength(1); + const service = services[0]; + expect(service?.url).toMatch(/^http:\/\/127\.0\.0\.1:\d+$/); + await expect(fetch(service!.url!)).resolves.toMatchObject({ ok: true }); + + await resetRuntimeServicesForTests(); + + const result = await reconcilePersistedRuntimeServicesOnStartup(db); + expect(result).toMatchObject({ reconciled: 1, adopted: 1, stopped: 0 }); + + const persisted = await db + .select() + .from(workspaceRuntimeServices) + .where(eq(workspaceRuntimeServices.id, service!.id)) + .then((rows) => rows[0] ?? null); + expect(persisted?.status).toBe("running"); + expect(persisted?.providerRef).toMatch(/^\d+$/); + + await stopRuntimeServicesForExecutionWorkspace({ + db, + executionWorkspaceId, + workspaceCwd: workspace.cwd, + }); + + await expect(fetch(service!.url!)).rejects.toThrow(); + }); +}); + describe("normalizeAdapterManagedRuntimeServices", () => { it("fills workspace defaults and derives stable ids for adapter-managed services", () => { const workspace = buildWorkspace("/tmp/project"); diff --git a/server/src/services/local-service-supervisor.ts b/server/src/services/local-service-supervisor.ts new file mode 100644 index 00000000..68dbbdc8 --- /dev/null +++ b/server/src/services/local-service-supervisor.ts @@ -0,0 +1,302 @@ +import { execFile } from "node:child_process"; +import { createHash } from "node:crypto"; +import fs from "node:fs/promises"; +import path from "node:path"; +import { setTimeout as delay } from "node:timers/promises"; +import { promisify } from "node:util"; +import { resolvePaperclipInstanceRoot } from "../home-paths.js"; + +const execFileAsync = promisify(execFile); + +export interface LocalServiceRegistryRecord { + version: 1; + serviceKey: string; + profileKind: string; + serviceName: string; + command: string; + cwd: string; + envFingerprint: string; + port: number | null; + url: string | null; + pid: number; + processGroupId: number | null; + provider: "local_process"; + runtimeServiceId: string | null; + reuseKey: string | null; + startedAt: string; + lastSeenAt: string; + metadata: Record | null; +} + +export interface LocalServiceIdentityInput { + profileKind: string; + serviceName: string; + cwd: string; + command: string; + envFingerprint: string; + port: number | null; + scope: Record | null; +} + +function stableStringify(value: unknown): string { + if (Array.isArray(value)) { + return `[${value.map((entry) => stableStringify(entry)).join(",")}]`; + } + if (value && typeof value === "object") { + const rec = value as Record; + return `{${Object.keys(rec).sort().map((key) => `${JSON.stringify(key)}:${stableStringify(rec[key])}`).join(",")}}`; + } + return JSON.stringify(value); +} + +function sanitizeServiceKeySegment(value: string, fallback: string): string { + const normalized = value + .trim() + .toLowerCase() + .replace(/[^a-z0-9._-]+/g, "-") + .replace(/-+/g, "-") + .replace(/^-+|-+$/g, ""); + return normalized || fallback; +} + +function getRuntimeServicesDir() { + return path.resolve(resolvePaperclipInstanceRoot(), "runtime-services"); +} + +function getRuntimeServiceRegistryPath(serviceKey: string) { + return path.resolve(getRuntimeServicesDir(), `${serviceKey}.json`); +} + +function normalizeRegistryRecord(raw: unknown): LocalServiceRegistryRecord | null { + if (!raw || typeof raw !== "object") return null; + const rec = raw as Record; + if ( + rec.version !== 1 || + typeof rec.serviceKey !== "string" || + typeof rec.profileKind !== "string" || + typeof rec.serviceName !== "string" || + typeof rec.command !== "string" || + typeof rec.cwd !== "string" || + typeof rec.envFingerprint !== "string" || + typeof rec.pid !== "number" + ) { + return null; + } + + return { + version: 1, + serviceKey: rec.serviceKey, + profileKind: rec.profileKind, + serviceName: rec.serviceName, + command: rec.command, + cwd: rec.cwd, + envFingerprint: rec.envFingerprint, + port: typeof rec.port === "number" ? rec.port : null, + url: typeof rec.url === "string" ? rec.url : null, + pid: rec.pid, + processGroupId: typeof rec.processGroupId === "number" ? rec.processGroupId : null, + provider: "local_process", + runtimeServiceId: typeof rec.runtimeServiceId === "string" ? rec.runtimeServiceId : null, + reuseKey: typeof rec.reuseKey === "string" ? rec.reuseKey : null, + startedAt: typeof rec.startedAt === "string" ? rec.startedAt : new Date().toISOString(), + lastSeenAt: typeof rec.lastSeenAt === "string" ? rec.lastSeenAt : new Date().toISOString(), + metadata: + rec.metadata && typeof rec.metadata === "object" && !Array.isArray(rec.metadata) + ? (rec.metadata as Record) + : null, + }; +} + +async function safeReadRegistryRecord(filePath: string) { + try { + const raw = JSON.parse(await fs.readFile(filePath, "utf8")) as unknown; + return normalizeRegistryRecord(raw); + } catch { + return null; + } +} + +export function createLocalServiceKey(input: LocalServiceIdentityInput) { + const digest = createHash("sha256") + .update( + stableStringify({ + profileKind: input.profileKind, + serviceName: input.serviceName, + cwd: path.resolve(input.cwd), + command: input.command, + envFingerprint: input.envFingerprint, + port: input.port, + scope: input.scope ?? null, + }), + ) + .digest("hex") + .slice(0, 24); + + return `${sanitizeServiceKeySegment(input.profileKind, "service")}-${sanitizeServiceKeySegment(input.serviceName, "service")}-${digest}`; +} + +export async function writeLocalServiceRegistryRecord(record: LocalServiceRegistryRecord) { + await fs.mkdir(getRuntimeServicesDir(), { recursive: true }); + await fs.writeFile( + getRuntimeServiceRegistryPath(record.serviceKey), + `${JSON.stringify(record, null, 2)}\n`, + "utf8", + ); +} + +export async function removeLocalServiceRegistryRecord(serviceKey: string) { + await fs.rm(getRuntimeServiceRegistryPath(serviceKey), { force: true }); +} + +export async function readLocalServiceRegistryRecord(serviceKey: string) { + return await safeReadRegistryRecord(getRuntimeServiceRegistryPath(serviceKey)); +} + +export async function listLocalServiceRegistryRecords(filter?: { + profileKind?: string; + metadata?: Record; +}) { + try { + const entries = await fs.readdir(getRuntimeServicesDir(), { withFileTypes: true }); + const records = await Promise.all( + entries + .filter((entry) => entry.isFile() && entry.name.endsWith(".json")) + .map((entry) => safeReadRegistryRecord(path.resolve(getRuntimeServicesDir(), entry.name))), + ); + + return records + .filter((record): record is LocalServiceRegistryRecord => record !== null) + .filter((record) => { + if (filter?.profileKind && record.profileKind !== filter.profileKind) return false; + if (!filter?.metadata) return true; + return Object.entries(filter.metadata).every(([key, value]) => record.metadata?.[key] === value); + }) + .sort((left, right) => left.serviceKey.localeCompare(right.serviceKey)); + } catch { + return []; + } +} + +export async function findLocalServiceRegistryRecordByRuntimeServiceId(input: { + runtimeServiceId: string; + profileKind?: string; +}) { + const records = await listLocalServiceRegistryRecords( + input.profileKind ? { profileKind: input.profileKind } : undefined, + ); + return records.find((record) => record.runtimeServiceId === input.runtimeServiceId) ?? null; +} + +export function isPidAlive(pid: number) { + if (!Number.isInteger(pid) || pid <= 0) return false; + try { + process.kill(pid, 0); + return true; + } catch { + return false; + } +} + +async function isLikelyMatchingCommand(record: LocalServiceRegistryRecord) { + if (process.platform === "win32") return true; + try { + const { stdout } = await execFileAsync("ps", ["-o", "command=", "-p", String(record.pid)]); + const commandLine = stdout.trim(); + if (!commandLine) return false; + return commandLine.includes(record.command) || commandLine.includes(record.serviceName); + } catch { + return true; + } +} + +export async function findAdoptableLocalService(input: { + serviceKey: string; + command?: string | null; + cwd?: string | null; + envFingerprint?: string | null; + port?: number | null; +}) { + const record = await readLocalServiceRegistryRecord(input.serviceKey); + if (!record) return null; + + if (!isPidAlive(record.pid)) { + await removeLocalServiceRegistryRecord(input.serviceKey); + return null; + } + if (!(await isLikelyMatchingCommand(record))) { + await removeLocalServiceRegistryRecord(input.serviceKey); + return null; + } + if (input.command && record.command !== input.command) return null; + if (input.cwd && path.resolve(record.cwd) !== path.resolve(input.cwd)) return null; + if (input.envFingerprint && record.envFingerprint !== input.envFingerprint) return null; + if (input.port !== undefined && input.port !== null && record.port !== input.port) return null; + return record; +} + +export async function touchLocalServiceRegistryRecord( + serviceKey: string, + patch?: Partial>, +) { + const existing = await readLocalServiceRegistryRecord(serviceKey); + if (!existing) return null; + const next: LocalServiceRegistryRecord = { + ...existing, + ...patch, + version: 1, + serviceKey, + lastSeenAt: patch?.lastSeenAt ?? new Date().toISOString(), + }; + await writeLocalServiceRegistryRecord(next); + return next; +} + +export async function terminateLocalService( + record: Pick, + opts?: { signal?: NodeJS.Signals; forceAfterMs?: number }, +) { + const signal = opts?.signal ?? "SIGTERM"; + const targetProcessGroup = process.platform !== "win32" && record.processGroupId && record.processGroupId > 0; + try { + if (targetProcessGroup) { + process.kill(-record.processGroupId!, signal); + } else { + process.kill(record.pid, signal); + } + } catch { + return; + } + + const deadline = Date.now() + (opts?.forceAfterMs ?? 2_000); + while (Date.now() < deadline) { + if (!isPidAlive(record.pid)) { + return; + } + await delay(100); + } + + if (!isPidAlive(record.pid)) return; + try { + if (targetProcessGroup) { + process.kill(-record.processGroupId!, "SIGKILL"); + } else { + process.kill(record.pid, "SIGKILL"); + } + } catch { + // Ignore cleanup races. + } +} + +export async function readLocalServicePortOwner(port: number) { + if (!Number.isInteger(port) || port <= 0 || process.platform === "win32") return null; + try { + const { stdout } = await execFileAsync("lsof", ["-nPiTCP", `:${port}`, "-sTCP:LISTEN", "-t"]); + const firstPid = stdout + .split("\n") + .map((line) => Number.parseInt(line.trim(), 10)) + .find((value) => Number.isInteger(value) && value > 0); + return firstPid ?? null; + } catch { + return null; + } +} diff --git a/server/src/services/workspace-runtime.ts b/server/src/services/workspace-runtime.ts index 12375701..9991a66d 100644 --- a/server/src/services/workspace-runtime.ts +++ b/server/src/services/workspace-runtime.ts @@ -10,6 +10,16 @@ import { workspaceRuntimeServices } from "@paperclipai/db"; import { and, desc, eq, inArray } from "drizzle-orm"; import { asNumber, asString, parseObject, renderTemplate } from "../adapters/utils.js"; import { resolveHomeAwarePath } from "../home-paths.js"; +import { + createLocalServiceKey, + findLocalServiceRegistryRecordByRuntimeServiceId, + findAdoptableLocalService, + readLocalServicePortOwner, + removeLocalServiceRegistryRecord, + terminateLocalService, + touchLocalServiceRegistryRecord, + writeLocalServiceRegistryRecord, +} from "./local-service-supervisor.js"; import type { WorkspaceOperationRecorder } from "./workspace-operations.js"; export interface ExecutionWorkspaceInput { @@ -77,12 +87,24 @@ interface RuntimeServiceRecord extends RuntimeServiceRef { leaseRunIds: Set; idleTimer: ReturnType | null; envFingerprint: string; + serviceKey: string; + profileKind: string; + processGroupId: number | null; } const runtimeServicesById = new Map(); const runtimeServicesByReuseKey = new Map(); const runtimeServiceLeasesByRun = new Map(); +export async function resetRuntimeServicesForTests() { + for (const record of runtimeServicesById.values()) { + clearIdleTimer(record); + } + runtimeServicesById.clear(); + runtimeServicesByReuseKey.clear(); + runtimeServiceLeasesByRun.clear(); +} + function stableStringify(value: unknown): string { if (Array.isArray(value)) { return `[${value.map((entry) => stableStringify(entry)).join(",")}]`; @@ -1101,8 +1123,17 @@ async function startLocalRuntimeService(input: { if (!command) throw new Error(`Runtime service "${serviceName}" is missing command`); const serviceCwdTemplate = asString(input.service.cwd, "."); const portConfig = parseObject(input.service.port); - const port = asString(portConfig.type, "") === "auto" ? await allocatePort() : null; const envConfig = parseObject(input.service.env); + const envFingerprint = createHash("sha256").update(stableStringify(envConfig)).digest("hex"); + const serviceIdentityFingerprint = input.reuseKey ?? envFingerprint; + const explicitPort = asNumber(portConfig.value, asNumber(input.service.port, 0)); + const identityPort = explicitPort > 0 ? explicitPort : null; + const port = + asString(portConfig.type, "") === "auto" + ? await allocatePort() + : explicitPort > 0 + ? explicitPort + : null; const templateData = buildTemplateData({ workspace: input.workspace, agent: input.agent, @@ -1124,6 +1155,80 @@ async function startLocalRuntimeService(input: { const portEnvKey = asString(portConfig.envKey, "PORT"); env[portEnvKey] = String(port); } + const expose = parseObject(input.service.expose); + const readiness = parseObject(input.service.readiness); + const urlTemplate = + asString(expose.urlTemplate, "") || + asString(readiness.urlTemplate, ""); + const url = urlTemplate ? renderTemplate(urlTemplate, templateData) : null; + const stopPolicy = parseObject(input.service.stopPolicy); + const serviceKey = createLocalServiceKey({ + profileKind: "workspace-runtime", + serviceName, + cwd: serviceCwd, + command, + envFingerprint: serviceIdentityFingerprint, + port: identityPort, + scope: { + scopeType: input.scopeType, + scopeId: input.scopeId, + executionWorkspaceId: input.executionWorkspaceId ?? null, + reuseKey: input.reuseKey, + }, + }); + const adoptedRecord = await findAdoptableLocalService({ + serviceKey, + command, + cwd: serviceCwd, + envFingerprint: serviceIdentityFingerprint, + port: identityPort, + }); + if (adoptedRecord) { + return { + id: adoptedRecord.runtimeServiceId ?? randomUUID(), + companyId: input.agent.companyId, + projectId: input.workspace.projectId, + projectWorkspaceId: input.workspace.workspaceId, + executionWorkspaceId: input.executionWorkspaceId ?? null, + issueId: input.issue?.id ?? null, + serviceName, + status: "running", + lifecycle, + scopeType: input.scopeType, + scopeId: input.scopeId, + reuseKey: input.reuseKey, + command, + cwd: serviceCwd, + port: adoptedRecord.port ?? port, + url: adoptedRecord.url ?? url, + provider: "local_process", + providerRef: String(adoptedRecord.pid), + ownerAgentId: input.agent.id, + startedByRunId: input.runId, + lastUsedAt: new Date().toISOString(), + startedAt: adoptedRecord.startedAt, + stoppedAt: null, + stopPolicy, + healthStatus: "healthy", + reused: true, + db: input.db, + child: null, + leaseRunIds: new Set([input.runId]), + idleTimer: null, + envFingerprint, + serviceKey, + profileKind: "workspace-runtime", + processGroupId: adoptedRecord.processGroupId ?? null, + }; + } + if (identityPort) { + const ownerPid = await readLocalServicePortOwner(identityPort); + if (ownerPid) { + throw new Error( + `Runtime service "${serviceName}" could not start because port ${identityPort} is already in use by pid ${ownerPid}`, + ); + } + } const shell = process.env.SHELL?.trim() || "/bin/sh"; const child = spawn(shell, ["-lc", command], { cwd: serviceCwd, @@ -1144,13 +1249,6 @@ async function startLocalRuntimeService(input: { if (input.onLog) await input.onLog("stderr", `[service:${serviceName}] ${text}`); }); - const expose = parseObject(input.service.expose); - const readiness = parseObject(input.service.readiness); - const urlTemplate = - asString(expose.urlTemplate, "") || - asString(readiness.urlTemplate, ""); - const url = urlTemplate ? renderTemplate(urlTemplate, templateData) : null; - try { await waitForReadiness({ service: input.service, url }); } catch (err) { @@ -1160,8 +1258,7 @@ async function startLocalRuntimeService(input: { ); } - const envFingerprint = createHash("sha256").update(stableStringify(envConfig)).digest("hex"); - return { + const record: RuntimeServiceRecord = { id: randomUUID(), companyId: input.agent.companyId, projectId: input.workspace.projectId, @@ -1185,7 +1282,7 @@ async function startLocalRuntimeService(input: { lastUsedAt: new Date().toISOString(), startedAt: new Date().toISOString(), stoppedAt: null, - stopPolicy: parseObject(input.service.stopPolicy), + stopPolicy, healthStatus: "healthy", reused: false, db: input.db, @@ -1193,7 +1290,41 @@ async function startLocalRuntimeService(input: { leaseRunIds: new Set([input.runId]), idleTimer: null, envFingerprint, + serviceKey, + profileKind: "workspace-runtime", + processGroupId: child.pid ?? null, }; + + if (child.pid) { + await writeLocalServiceRegistryRecord({ + version: 1, + serviceKey, + profileKind: "workspace-runtime", + serviceName, + command, + cwd: serviceCwd, + envFingerprint: serviceIdentityFingerprint, + port, + url, + pid: child.pid, + processGroupId: child.pid, + provider: "local_process", + runtimeServiceId: record.id, + reuseKey: input.reuseKey, + startedAt: record.startedAt, + lastSeenAt: record.lastUsedAt, + metadata: { + projectId: record.projectId, + projectWorkspaceId: record.projectWorkspaceId, + executionWorkspaceId: record.executionWorkspaceId, + issueId: record.issueId, + scopeType: record.scopeType, + scopeId: record.scopeId, + }, + }); + } + + return record; } function scheduleIdleStop(record: RuntimeServiceRecord) { @@ -1215,11 +1346,20 @@ async function stopRuntimeService(serviceId: string) { record.stoppedAt = new Date().toISOString(); if (record.child && record.child.pid) { terminateChildProcess(record.child); + } else if (record.providerRef) { + const pid = Number.parseInt(record.providerRef, 10); + if (Number.isInteger(pid) && pid > 0) { + await terminateLocalService({ + pid, + processGroupId: record.processGroupId, + }); + } } runtimeServicesById.delete(serviceId); if (record.reuseKey) { runtimeServicesByReuseKey.delete(record.reuseKey); } + await removeLocalServiceRegistryRecord(record.serviceKey); await persistRuntimeServiceRecord(record.db, record); } @@ -1264,6 +1404,7 @@ function registerRuntimeService(db: Db | undefined, record: RuntimeServiceRecord if (current.reuseKey && runtimeServicesByReuseKey.get(current.reuseKey) === current.id) { runtimeServicesByReuseKey.delete(current.reuseKey); } + void removeLocalServiceRegistryRecord(current.serviceKey); void persistRuntimeServiceRecord(db, current); }); } @@ -1314,6 +1455,10 @@ export async function ensureRuntimeServicesForRun(input: { existing.lastUsedAt = new Date().toISOString(); existing.stoppedAt = null; clearIdleTimer(existing); + void touchLocalServiceRegistryRecord(existing.serviceKey, { + runtimeServiceId: existing.id, + lastSeenAt: existing.lastUsedAt, + }); await persistRuntimeServiceRecord(input.db, existing); acquiredServiceIds.push(existing.id); refs.push(toRuntimeServiceRef(existing, { reused: true })); @@ -1426,8 +1571,8 @@ export async function listWorkspaceRuntimeServicesForProjectWorkspaces( } export async function reconcilePersistedRuntimeServicesOnStartup(db: Db) { - const staleRows = await db - .select({ id: workspaceRuntimeServices.id }) + const rows = await db + .select() .from(workspaceRuntimeServices) .where( and( @@ -1436,26 +1581,84 @@ export async function reconcilePersistedRuntimeServicesOnStartup(db: Db) { ), ); - if (staleRows.length === 0) return { reconciled: 0 }; + if (rows.length === 0) return { reconciled: 0, adopted: 0, stopped: 0 }; - const now = new Date(); - await db - .update(workspaceRuntimeServices) - .set({ - status: "stopped", - healthStatus: "unknown", - stoppedAt: now, - lastUsedAt: now, - updatedAt: now, - }) - .where( - and( - eq(workspaceRuntimeServices.provider, "local_process"), - inArray(workspaceRuntimeServices.status, ["starting", "running"]), - ), - ); + let adopted = 0; + let stopped = 0; + for (const row of rows) { + const adoptedRecord = await findLocalServiceRegistryRecordByRuntimeServiceId({ + runtimeServiceId: row.id, + profileKind: "workspace-runtime", + }); + if (adoptedRecord) { + const record: RuntimeServiceRecord = { + id: row.id, + companyId: row.companyId, + projectId: row.projectId ?? null, + projectWorkspaceId: row.projectWorkspaceId ?? null, + executionWorkspaceId: row.executionWorkspaceId ?? null, + issueId: row.issueId ?? null, + serviceName: row.serviceName, + status: "running", + lifecycle: row.lifecycle as RuntimeServiceRecord["lifecycle"], + scopeType: row.scopeType as RuntimeServiceRecord["scopeType"], + scopeId: row.scopeId ?? null, + reuseKey: row.reuseKey ?? null, + command: row.command ?? null, + cwd: row.cwd ?? null, + port: adoptedRecord.port ?? row.port ?? null, + url: adoptedRecord.url ?? row.url ?? null, + provider: "local_process", + providerRef: String(adoptedRecord.pid), + ownerAgentId: row.ownerAgentId ?? null, + startedByRunId: row.startedByRunId ?? null, + lastUsedAt: new Date().toISOString(), + startedAt: row.startedAt.toISOString(), + stoppedAt: null, + stopPolicy: (row.stopPolicy as Record | null) ?? null, + healthStatus: "healthy", + reused: true, + db, + child: null, + leaseRunIds: new Set(), + idleTimer: null, + envFingerprint: row.reuseKey ?? "", + serviceKey: adoptedRecord.serviceKey, + profileKind: "workspace-runtime", + processGroupId: adoptedRecord.processGroupId ?? null, + }; + registerRuntimeService(db, record); + await touchLocalServiceRegistryRecord(adoptedRecord.serviceKey, { + runtimeServiceId: row.id, + lastSeenAt: record.lastUsedAt, + }); + await persistRuntimeServiceRecord(db, record); + adopted += 1; + continue; + } - return { reconciled: staleRows.length }; + const now = new Date(); + await db + .update(workspaceRuntimeServices) + .set({ + status: "stopped", + healthStatus: "unknown", + stoppedAt: now, + lastUsedAt: now, + updatedAt: now, + }) + .where(eq(workspaceRuntimeServices.id, row.id)); + const registryRecord = await findLocalServiceRegistryRecordByRuntimeServiceId({ + runtimeServiceId: row.id, + profileKind: "workspace-runtime", + }); + if (registryRecord) { + await removeLocalServiceRegistryRecord(registryRecord.serviceKey); + } + stopped += 1; + } + + return { reconciled: rows.length, adopted, stopped }; } export async function persistAdapterManagedRuntimeServices(input: {