Fix execution workspace runtime control reuse
Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
parent
2fea39b814
commit
e3f07aad55
2 changed files with 347 additions and 33 deletions
|
|
@ -10,7 +10,9 @@ import {
|
|||
agents,
|
||||
companies,
|
||||
createDb,
|
||||
executionWorkspaces,
|
||||
heartbeatRuns,
|
||||
projects,
|
||||
workspaceRuntimeServices,
|
||||
} from "@paperclipai/db";
|
||||
import { eq } from "drizzle-orm";
|
||||
|
|
@ -880,6 +882,101 @@ describe("ensureRuntimeServicesForRun", () => {
|
|||
expect(third[0]?.id).not.toBe(first[0]?.id);
|
||||
});
|
||||
|
||||
it("does not reuse project-scoped shared services across different workspace launch contexts", async () => {
|
||||
const primaryWorkspaceRoot = await fs.mkdtemp(path.join(os.tmpdir(), "paperclip-runtime-primary-"));
|
||||
const worktreeWorkspaceRoot = path.join(primaryWorkspaceRoot, ".paperclip", "worktrees", "PAP-874-chat-speed-issues");
|
||||
await fs.mkdir(worktreeWorkspaceRoot, { recursive: true });
|
||||
|
||||
const primaryWorkspace = buildWorkspace(primaryWorkspaceRoot);
|
||||
const executionWorkspace: RealizedExecutionWorkspace = {
|
||||
...buildWorkspace(worktreeWorkspaceRoot),
|
||||
source: "task_session",
|
||||
strategy: "git_worktree",
|
||||
cwd: worktreeWorkspaceRoot,
|
||||
branchName: "PAP-874-chat-speed-issues",
|
||||
worktreePath: worktreeWorkspaceRoot,
|
||||
};
|
||||
const serviceCommand =
|
||||
"node -e \"require('node:http').createServer((req,res)=>res.end(process.env.PAPERCLIP_HOME)).listen(Number(process.env.PORT), '127.0.0.1')\"";
|
||||
const config = {
|
||||
workspaceRuntime: {
|
||||
services: [
|
||||
{
|
||||
name: "paperclip-dev",
|
||||
command: serviceCommand,
|
||||
cwd: ".",
|
||||
env: {
|
||||
PAPERCLIP_HOME: "{{workspace.cwd}}/.paperclip/runtime-services",
|
||||
},
|
||||
port: { type: "auto" },
|
||||
readiness: {
|
||||
type: "http",
|
||||
urlTemplate: "http://127.0.0.1:{{port}}",
|
||||
timeoutSec: 10,
|
||||
intervalMs: 100,
|
||||
},
|
||||
expose: {
|
||||
type: "url",
|
||||
urlTemplate: "http://127.0.0.1:{{port}}",
|
||||
},
|
||||
lifecycle: "shared",
|
||||
reuseScope: "project_workspace",
|
||||
stopPolicy: {
|
||||
type: "on_run_finish",
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
};
|
||||
|
||||
const primaryRunId = "run-project-workspace";
|
||||
const executionRunId = "run-execution-workspace";
|
||||
leasedRunIds.add(primaryRunId);
|
||||
leasedRunIds.add(executionRunId);
|
||||
|
||||
const primaryServices = await ensureRuntimeServicesForRun({
|
||||
runId: primaryRunId,
|
||||
agent: {
|
||||
id: "agent-1",
|
||||
name: "Codex Coder",
|
||||
companyId: "company-1",
|
||||
},
|
||||
issue: null,
|
||||
workspace: primaryWorkspace,
|
||||
config,
|
||||
adapterEnv: {},
|
||||
});
|
||||
|
||||
const executionServices = await ensureRuntimeServicesForRun({
|
||||
runId: executionRunId,
|
||||
agent: {
|
||||
id: "agent-1",
|
||||
name: "Codex Coder",
|
||||
companyId: "company-1",
|
||||
},
|
||||
issue: null,
|
||||
workspace: executionWorkspace,
|
||||
executionWorkspaceId: "execution-workspace-1",
|
||||
config,
|
||||
adapterEnv: {},
|
||||
});
|
||||
|
||||
expect(primaryServices).toHaveLength(1);
|
||||
expect(executionServices).toHaveLength(1);
|
||||
expect(primaryServices[0]?.reused).toBe(false);
|
||||
expect(executionServices[0]?.reused).toBe(false);
|
||||
expect(executionServices[0]?.id).not.toBe(primaryServices[0]?.id);
|
||||
expect(executionServices[0]?.executionWorkspaceId).toBe("execution-workspace-1");
|
||||
expect(executionServices[0]?.cwd).toBe(worktreeWorkspaceRoot);
|
||||
expect(executionServices[0]?.url).not.toBe(primaryServices[0]?.url);
|
||||
|
||||
const primaryResponse = await fetch(primaryServices[0]!.url!);
|
||||
expect(await primaryResponse.text()).toBe(path.join(primaryWorkspaceRoot, ".paperclip", "runtime-services"));
|
||||
|
||||
const executionResponse = await fetch(executionServices[0]!.url!);
|
||||
expect(await executionResponse.text()).toBe(path.join(worktreeWorkspaceRoot, ".paperclip", "runtime-services"));
|
||||
});
|
||||
|
||||
it("does not leak parent Paperclip instance env into runtime service commands", async () => {
|
||||
const workspaceRoot = await fs.mkdtemp(path.join(os.tmpdir(), "paperclip-runtime-env-"));
|
||||
const workspace = buildWorkspace(workspaceRoot);
|
||||
|
|
@ -1089,6 +1186,8 @@ describeEmbeddedPostgres("workspace runtime startup reconciliation", () => {
|
|||
|
||||
afterEach(async () => {
|
||||
await db.delete(workspaceRuntimeServices);
|
||||
await db.delete(executionWorkspaces);
|
||||
await db.delete(projects);
|
||||
await db.delete(heartbeatRuns);
|
||||
await db.delete(agents);
|
||||
await db.delete(companies);
|
||||
|
|
@ -1201,6 +1300,127 @@ describeEmbeddedPostgres("workspace runtime startup reconciliation", () => {
|
|||
|
||||
await expect(fetch(service!.url!)).rejects.toThrow();
|
||||
});
|
||||
|
||||
it("persists controlled execution workspace stops as stopped", async () => {
|
||||
const workspaceRoot = await fs.mkdtemp(path.join(os.tmpdir(), "paperclip-runtime-stop-persisted-"));
|
||||
const companyId = randomUUID();
|
||||
const agentId = randomUUID();
|
||||
const projectId = randomUUID();
|
||||
const runId = randomUUID();
|
||||
const executionWorkspaceId = randomUUID();
|
||||
|
||||
await db.insert(companies).values({
|
||||
id: companyId,
|
||||
name: "Paperclip",
|
||||
issuePrefix: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`,
|
||||
requireBoardApprovalForNewAgents: false,
|
||||
});
|
||||
await db.insert(agents).values({
|
||||
id: agentId,
|
||||
companyId,
|
||||
name: "Codex Coder",
|
||||
role: "engineer",
|
||||
status: "active",
|
||||
adapterType: "codex_local",
|
||||
adapterConfig: {},
|
||||
runtimeConfig: {},
|
||||
permissions: {},
|
||||
});
|
||||
await db.insert(projects).values({
|
||||
id: projectId,
|
||||
companyId,
|
||||
name: "Runtime stop test",
|
||||
status: "active",
|
||||
});
|
||||
await db.insert(executionWorkspaces).values({
|
||||
id: executionWorkspaceId,
|
||||
companyId,
|
||||
projectId,
|
||||
mode: "isolated_workspace",
|
||||
strategyType: "git_worktree",
|
||||
name: "Execution workspace stop test",
|
||||
status: "active",
|
||||
cwd: workspaceRoot,
|
||||
providerType: "local_fs",
|
||||
providerRef: workspaceRoot,
|
||||
});
|
||||
await db.insert(heartbeatRuns).values({
|
||||
id: runId,
|
||||
companyId,
|
||||
agentId,
|
||||
invocationSource: "manual",
|
||||
status: "running",
|
||||
startedAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
});
|
||||
|
||||
const workspace = {
|
||||
...buildWorkspace(workspaceRoot),
|
||||
projectId: null,
|
||||
workspaceId: null,
|
||||
};
|
||||
leasedRunIds.add(runId);
|
||||
|
||||
const services = await ensureRuntimeServicesForRun({
|
||||
db,
|
||||
runId,
|
||||
agent: {
|
||||
id: agentId,
|
||||
name: "Codex Coder",
|
||||
companyId,
|
||||
},
|
||||
issue: null,
|
||||
workspace,
|
||||
executionWorkspaceId,
|
||||
config: {
|
||||
workspaceRuntime: {
|
||||
services: [
|
||||
{
|
||||
name: "web",
|
||||
command:
|
||||
"node -e \"require('node:http').createServer((req,res)=>res.end('ok')).listen(Number(process.env.PORT), '127.0.0.1')\"",
|
||||
port: { type: "auto" },
|
||||
readiness: {
|
||||
type: "http",
|
||||
urlTemplate: "http://127.0.0.1:{{port}}",
|
||||
timeoutSec: 10,
|
||||
intervalMs: 100,
|
||||
},
|
||||
lifecycle: "shared",
|
||||
reuseScope: "execution_workspace",
|
||||
stopPolicy: {
|
||||
type: "manual",
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
adapterEnv: {},
|
||||
});
|
||||
|
||||
expect(services[0]?.url).toBeTruthy();
|
||||
|
||||
await stopRuntimeServicesForExecutionWorkspace({
|
||||
db,
|
||||
executionWorkspaceId,
|
||||
workspaceCwd: workspace.cwd,
|
||||
});
|
||||
await releaseRuntimeServicesForRun(runId);
|
||||
leasedRunIds.delete(runId);
|
||||
await new Promise((resolve) => setTimeout(resolve, 250));
|
||||
|
||||
await expect(fetch(services[0]!.url!)).rejects.toThrow();
|
||||
|
||||
const persisted = await db
|
||||
.select()
|
||||
.from(workspaceRuntimeServices)
|
||||
.where(eq(workspaceRuntimeServices.id, services[0]!.id))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
|
||||
expect(persisted?.status).toBe("stopped");
|
||||
expect(persisted?.healthStatus).toBe("unknown");
|
||||
expect(persisted?.stoppedAt).toBeTruthy();
|
||||
});
|
||||
});
|
||||
|
||||
describe("normalizeAdapterManagedRuntimeServices", () => {
|
||||
|
|
|
|||
|
|
@ -914,6 +914,88 @@ function buildTemplateData(input: {
|
|||
};
|
||||
}
|
||||
|
||||
function renderRuntimeServiceEnv(input: {
|
||||
envConfig: Record<string, unknown>;
|
||||
templateData: ReturnType<typeof buildTemplateData>;
|
||||
}) {
|
||||
const rendered: Record<string, string> = {};
|
||||
for (const [key, value] of Object.entries(input.envConfig)) {
|
||||
if (typeof value !== "string") continue;
|
||||
rendered[key] = renderTemplate(value, input.templateData);
|
||||
}
|
||||
return rendered;
|
||||
}
|
||||
|
||||
function resolveRuntimeServiceReuseIdentity(input: {
|
||||
service: Record<string, unknown>;
|
||||
workspace: RealizedExecutionWorkspace;
|
||||
agent: ExecutionWorkspaceAgentRef;
|
||||
issue: ExecutionWorkspaceIssueRef | null;
|
||||
adapterEnv: Record<string, string>;
|
||||
scopeType: RuntimeServiceRef["scopeType"];
|
||||
scopeId: string | null;
|
||||
}): {
|
||||
serviceName: string;
|
||||
lifecycle: RuntimeServiceRef["lifecycle"];
|
||||
command: string;
|
||||
serviceCwd: string;
|
||||
envConfig: Record<string, unknown>;
|
||||
envFingerprint: string;
|
||||
explicitPort: number;
|
||||
identityPort: number | null;
|
||||
reuseKey: string | null;
|
||||
} {
|
||||
const serviceName = asString(input.service.name, "service");
|
||||
const lifecycle = asString(input.service.lifecycle, "shared") === "ephemeral" ? "ephemeral" : "shared";
|
||||
const command = asString(input.service.command, "");
|
||||
const serviceCwdTemplate = asString(input.service.cwd, ".");
|
||||
const portConfig = parseObject(input.service.port);
|
||||
const envConfig = parseObject(input.service.env);
|
||||
const explicitPort = asNumber(portConfig.value, asNumber(input.service.port, 0));
|
||||
const identityPort = explicitPort > 0 ? explicitPort : null;
|
||||
const templateData = buildTemplateData({
|
||||
workspace: input.workspace,
|
||||
agent: input.agent,
|
||||
issue: input.issue,
|
||||
adapterEnv: input.adapterEnv,
|
||||
port: identityPort,
|
||||
});
|
||||
const serviceCwd = resolveConfiguredPath(renderTemplate(serviceCwdTemplate, templateData), input.workspace.cwd);
|
||||
const renderedEnv = renderRuntimeServiceEnv({
|
||||
envConfig,
|
||||
templateData,
|
||||
});
|
||||
const envFingerprint = createHash("sha256").update(stableStringify(renderedEnv)).digest("hex");
|
||||
const reuseKey =
|
||||
lifecycle === "shared"
|
||||
? createHash("sha256")
|
||||
.update(
|
||||
stableStringify({
|
||||
scopeType: input.scopeType,
|
||||
scopeId: input.scopeId,
|
||||
serviceName,
|
||||
command,
|
||||
cwd: serviceCwd,
|
||||
port: identityPort,
|
||||
env: renderedEnv,
|
||||
}),
|
||||
)
|
||||
.digest("hex")
|
||||
: null;
|
||||
|
||||
return {
|
||||
serviceName,
|
||||
lifecycle,
|
||||
command,
|
||||
serviceCwd,
|
||||
envConfig,
|
||||
envFingerprint,
|
||||
explicitPort,
|
||||
identityPort,
|
||||
reuseKey,
|
||||
};
|
||||
}
|
||||
|
||||
function resolveServiceScopeId(input: {
|
||||
service: Record<string, unknown>;
|
||||
workspace: RealizedExecutionWorkspace;
|
||||
|
|
@ -1121,17 +1203,25 @@ async function startLocalRuntimeService(input: {
|
|||
scopeType: "project_workspace" | "execution_workspace" | "run" | "agent";
|
||||
scopeId: string | null;
|
||||
}): Promise<RuntimeServiceRecord> {
|
||||
const serviceName = asString(input.service.name, "service");
|
||||
const lifecycle = asString(input.service.lifecycle, "shared") === "ephemeral" ? "ephemeral" : "shared";
|
||||
const command = asString(input.service.command, "");
|
||||
const identity = resolveRuntimeServiceReuseIdentity({
|
||||
service: input.service,
|
||||
workspace: input.workspace,
|
||||
agent: input.agent,
|
||||
issue: input.issue,
|
||||
adapterEnv: input.adapterEnv,
|
||||
scopeType: input.scopeType,
|
||||
scopeId: input.scopeId,
|
||||
});
|
||||
const serviceName = identity.serviceName;
|
||||
const lifecycle = identity.lifecycle;
|
||||
const command = identity.command;
|
||||
if (!command) throw new Error(`Runtime service "${serviceName}" is missing command`);
|
||||
const serviceCwdTemplate = asString(input.service.cwd, ".");
|
||||
const portConfig = parseObject(input.service.port);
|
||||
const envConfig = parseObject(input.service.env);
|
||||
const envFingerprint = createHash("sha256").update(stableStringify(envConfig)).digest("hex");
|
||||
const envConfig = identity.envConfig;
|
||||
const envFingerprint = identity.envFingerprint;
|
||||
const serviceIdentityFingerprint = input.reuseKey ?? envFingerprint;
|
||||
const explicitPort = asNumber(portConfig.value, asNumber(input.service.port, 0));
|
||||
const identityPort = explicitPort > 0 ? explicitPort : null;
|
||||
const explicitPort = identity.explicitPort;
|
||||
const identityPort = identity.identityPort;
|
||||
const port =
|
||||
asString(portConfig.type, "") === "auto"
|
||||
? await allocatePort()
|
||||
|
|
@ -1145,15 +1235,16 @@ async function startLocalRuntimeService(input: {
|
|||
adapterEnv: input.adapterEnv,
|
||||
port,
|
||||
});
|
||||
const serviceCwd = resolveConfiguredPath(renderTemplate(serviceCwdTemplate, templateData), input.workspace.cwd);
|
||||
const serviceCwd =
|
||||
port === identityPort
|
||||
? identity.serviceCwd
|
||||
: resolveConfiguredPath(renderTemplate(asString(input.service.cwd, "."), templateData), input.workspace.cwd);
|
||||
const env: Record<string, string> = {
|
||||
...sanitizeRuntimeServiceBaseEnv(process.env),
|
||||
...input.adapterEnv,
|
||||
} as Record<string, string>;
|
||||
for (const [key, value] of Object.entries(envConfig)) {
|
||||
if (typeof value === "string") {
|
||||
env[key] = renderTemplate(value, templateData);
|
||||
}
|
||||
for (const [key, value] of Object.entries(renderRuntimeServiceEnv({ envConfig, templateData }))) {
|
||||
env[key] = value;
|
||||
}
|
||||
if (port) {
|
||||
const portEnvKey = asString(portConfig.envKey, "PORT");
|
||||
|
|
@ -1346,8 +1437,13 @@ async function stopRuntimeService(serviceId: string) {
|
|||
if (!record) return;
|
||||
clearIdleTimer(record);
|
||||
record.status = "stopped";
|
||||
record.healthStatus = "unknown";
|
||||
record.lastUsedAt = new Date().toISOString();
|
||||
record.stoppedAt = new Date().toISOString();
|
||||
runtimeServicesById.delete(serviceId);
|
||||
if (record.reuseKey && runtimeServicesByReuseKey.get(record.reuseKey) === record.id) {
|
||||
runtimeServicesByReuseKey.delete(record.reuseKey);
|
||||
}
|
||||
if (record.child && record.child.pid) {
|
||||
await terminateLocalService({
|
||||
pid: record.child.pid,
|
||||
|
|
@ -1362,10 +1458,6 @@ async function stopRuntimeService(serviceId: string) {
|
|||
});
|
||||
}
|
||||
}
|
||||
runtimeServicesById.delete(serviceId);
|
||||
if (record.reuseKey) {
|
||||
runtimeServicesByReuseKey.delete(record.reuseKey);
|
||||
}
|
||||
await removeLocalServiceRegistryRecord(record.serviceKey);
|
||||
await persistRuntimeServiceRecord(record.db, record);
|
||||
}
|
||||
|
|
@ -1441,7 +1533,6 @@ export async function ensureRuntimeServicesForRun(input: {
|
|||
|
||||
try {
|
||||
for (const service of rawServices) {
|
||||
const lifecycle = asString(service.lifecycle, "shared") === "ephemeral" ? "ephemeral" : "shared";
|
||||
const { scopeType, scopeId } = resolveServiceScopeId({
|
||||
service,
|
||||
workspace: input.workspace,
|
||||
|
|
@ -1450,13 +1541,15 @@ export async function ensureRuntimeServicesForRun(input: {
|
|||
runId: input.runId,
|
||||
agent: input.agent,
|
||||
});
|
||||
const envConfig = parseObject(service.env);
|
||||
const envFingerprint = createHash("sha256").update(stableStringify(envConfig)).digest("hex");
|
||||
const serviceName = asString(service.name, "service");
|
||||
const reuseKey =
|
||||
lifecycle === "shared"
|
||||
? [scopeType, scopeId ?? "", serviceName, envFingerprint].join(":")
|
||||
: null;
|
||||
const reuseKey = resolveRuntimeServiceReuseIdentity({
|
||||
service,
|
||||
workspace: input.workspace,
|
||||
agent: input.agent,
|
||||
issue: input.issue,
|
||||
adapterEnv: input.adapterEnv,
|
||||
scopeType,
|
||||
scopeId,
|
||||
}).reuseKey;
|
||||
|
||||
if (reuseKey) {
|
||||
const existingId = runtimeServicesByReuseKey.get(reuseKey);
|
||||
|
|
@ -1520,7 +1613,6 @@ export async function startRuntimeServicesForWorkspaceControl(input: {
|
|||
const invocationId = input.invocationId ?? randomUUID();
|
||||
|
||||
for (const service of rawServices) {
|
||||
const lifecycle = asString(service.lifecycle, "shared") === "ephemeral" ? "ephemeral" : "shared";
|
||||
const { scopeType, scopeId } = resolveServiceScopeId({
|
||||
service,
|
||||
workspace: input.workspace,
|
||||
|
|
@ -1529,13 +1621,15 @@ export async function startRuntimeServicesForWorkspaceControl(input: {
|
|||
runId: invocationId,
|
||||
agent: input.actor,
|
||||
});
|
||||
const envConfig = parseObject(service.env);
|
||||
const envFingerprint = createHash("sha256").update(stableStringify(envConfig)).digest("hex");
|
||||
const serviceName = asString(service.name, "service");
|
||||
const reuseKey =
|
||||
lifecycle === "shared"
|
||||
? [scopeType, scopeId ?? "", serviceName, envFingerprint].join(":")
|
||||
: null;
|
||||
const reuseKey = resolveRuntimeServiceReuseIdentity({
|
||||
service,
|
||||
workspace: input.workspace,
|
||||
agent: input.actor,
|
||||
issue: input.issue,
|
||||
adapterEnv: input.adapterEnv,
|
||||
scopeType,
|
||||
scopeId,
|
||||
}).reuseKey;
|
||||
|
||||
if (reuseKey) {
|
||||
const existingId = runtimeServicesByReuseKey.get(reuseKey);
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue