nexus/server/src/services/heartbeat.ts
Forgotten 3a91ecbae3 Improve heartbeat execution, run tracking, and agent detail display
Enhance heartbeat service with better process adapter error recovery
and run state management. Expand heartbeat-run CLI with additional
output and diagnostics. Improve AgentDetail page run history and
status display.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 13:17:03 -06:00

1716 lines
52 KiB
TypeScript

import { spawn, type ChildProcess } from "node:child_process";
import { constants as fsConstants, promises as fs } from "node:fs";
import path from "node:path";
import { and, asc, desc, eq, gt, inArray, sql } from "drizzle-orm";
import type { Db } from "@paperclip/db";
import {
agents,
agentRuntimeState,
agentWakeupRequests,
heartbeatRunEvents,
heartbeatRuns,
costEvents,
} from "@paperclip/db";
import { conflict, notFound } from "../errors.js";
import { logger } from "../middleware/logger.js";
import { publishLiveEvent } from "./live-events.js";
import { getRunLogStore, type RunLogHandle } from "./run-log-store.js";
interface RunningProcess {
child: ChildProcess;
graceSec: number;
}
interface RunProcessResult {
exitCode: number | null;
signal: string | null;
timedOut: boolean;
stdout: string;
stderr: string;
}
interface UsageSummary {
inputTokens: number;
outputTokens: number;
cachedInputTokens?: number;
}
interface AdapterExecutionResult {
exitCode: number | null;
signal: string | null;
timedOut: boolean;
errorMessage?: string | null;
usage?: UsageSummary;
sessionId?: string | null;
provider?: string | null;
model?: string | null;
costUsd?: number | null;
resultJson?: Record<string, unknown> | null;
summary?: string | null;
clearSession?: boolean;
}
interface AdapterInvocationMeta {
adapterType: string;
command: string;
cwd?: string;
commandArgs?: string[];
env?: Record<string, string>;
prompt?: string;
context?: Record<string, unknown>;
}
interface WakeupOptions {
source?: "timer" | "assignment" | "on_demand" | "automation";
triggerDetail?: "manual" | "ping" | "callback" | "system";
reason?: string | null;
payload?: Record<string, unknown> | null;
idempotencyKey?: string | null;
requestedByActorType?: "user" | "agent" | "system";
requestedByActorId?: string | null;
contextSnapshot?: Record<string, unknown>;
}
const runningProcesses = new Map<string, RunningProcess>();
const MAX_CAPTURE_BYTES = 4 * 1024 * 1024;
const MAX_EXCERPT_BYTES = 32 * 1024;
const MAX_LIVE_LOG_CHUNK_BYTES = 8 * 1024;
const SENSITIVE_ENV_KEY = /(key|token|secret|password|passwd|authorization|cookie)/i;
function parseObject(value: unknown): Record<string, unknown> {
if (typeof value !== "object" || value === null || Array.isArray(value)) {
return {};
}
return value as Record<string, unknown>;
}
function asString(value: unknown, fallback: string): string {
return typeof value === "string" && value.length > 0 ? value : fallback;
}
function asNumber(value: unknown, fallback: number): number {
return typeof value === "number" && Number.isFinite(value) ? value : fallback;
}
function asBoolean(value: unknown, fallback: boolean): boolean {
return typeof value === "boolean" ? value : fallback;
}
function asStringArray(value: unknown): string[] {
return Array.isArray(value) ? value.filter((item): item is string => typeof item === "string") : [];
}
function parseJson(value: string): Record<string, unknown> | null {
try {
return JSON.parse(value) as Record<string, unknown>;
} catch {
return null;
}
}
function appendWithCap(prev: string, chunk: string, cap = MAX_CAPTURE_BYTES) {
const combined = prev + chunk;
return combined.length > cap ? combined.slice(combined.length - cap) : combined;
}
function appendExcerpt(prev: string, chunk: string) {
return appendWithCap(prev, chunk, MAX_EXCERPT_BYTES);
}
function resolvePathValue(obj: Record<string, unknown>, dottedPath: string) {
const parts = dottedPath.split(".");
let cursor: unknown = obj;
for (const part of parts) {
if (typeof cursor !== "object" || cursor === null || Array.isArray(cursor)) {
return "";
}
cursor = (cursor as Record<string, unknown>)[part];
}
if (cursor === null || cursor === undefined) return "";
if (typeof cursor === "string") return cursor;
if (typeof cursor === "number" || typeof cursor === "boolean") return String(cursor);
try {
return JSON.stringify(cursor);
} catch {
return "";
}
}
function renderTemplate(template: string, data: Record<string, unknown>) {
return template.replace(/{{\s*([a-zA-Z0-9_.-]+)\s*}}/g, (_, path) => resolvePathValue(data, path));
}
function parseCodexJsonl(stdout: string) {
let sessionId: string | null = null;
const messages: string[] = [];
const usage = {
inputTokens: 0,
cachedInputTokens: 0,
outputTokens: 0,
};
for (const rawLine of stdout.split(/\r?\n/)) {
const line = rawLine.trim();
if (!line) continue;
const event = parseJson(line);
if (!event) continue;
const type = asString(event.type, "");
if (type === "thread.started") {
sessionId = asString(event.thread_id, sessionId ?? "") || sessionId;
continue;
}
if (type === "item.completed") {
const item = parseObject(event.item);
if (asString(item.type, "") === "agent_message") {
const text = asString(item.text, "");
if (text) messages.push(text);
}
continue;
}
if (type === "turn.completed") {
const usageObj = parseObject(event.usage);
usage.inputTokens = asNumber(usageObj.input_tokens, usage.inputTokens);
usage.cachedInputTokens = asNumber(usageObj.cached_input_tokens, usage.cachedInputTokens);
usage.outputTokens = asNumber(usageObj.output_tokens, usage.outputTokens);
}
}
return {
sessionId,
summary: messages.join("\n\n").trim(),
usage,
};
}
function describeClaudeFailure(parsed: Record<string, unknown>): string | null {
const subtype = asString(parsed.subtype, "");
const resultText = asString(parsed.result, "").trim();
const errors = extractClaudeErrorMessages(parsed);
let detail = resultText;
if (!detail && errors.length > 0) {
detail = errors[0] ?? "";
}
const parts = ["Claude run failed"];
if (subtype) parts.push(`subtype=${subtype}`);
if (detail) parts.push(detail);
return parts.length > 1 ? parts.join(": ") : null;
}
function extractClaudeErrorMessages(parsed: Record<string, unknown>): string[] {
const raw = Array.isArray(parsed.errors) ? parsed.errors : [];
const messages: string[] = [];
for (const entry of raw) {
if (typeof entry === "string") {
const msg = entry.trim();
if (msg) messages.push(msg);
continue;
}
if (typeof entry !== "object" || entry === null || Array.isArray(entry)) {
continue;
}
const obj = entry as Record<string, unknown>;
const msg = asString(obj.message, "") || asString(obj.error, "") || asString(obj.code, "");
if (msg) {
messages.push(msg);
continue;
}
try {
messages.push(JSON.stringify(obj));
} catch {
// skip non-serializable entry
}
}
return messages;
}
function isClaudeUnknownSessionError(parsed: Record<string, unknown>): boolean {
const resultText = asString(parsed.result, "").trim();
const allMessages = [resultText, ...extractClaudeErrorMessages(parsed)]
.map((msg) => msg.trim())
.filter(Boolean);
return allMessages.some((msg) =>
/no conversation found with session id|unknown session|session .* not found/i.test(msg),
);
}
function parseClaudeStreamJson(stdout: string) {
let sessionId: string | null = null;
let model = "";
let finalResult: Record<string, unknown> | null = null;
const assistantTexts: string[] = [];
for (const rawLine of stdout.split(/\r?\n/)) {
const line = rawLine.trim();
if (!line) continue;
const event = parseJson(line);
if (!event) continue;
const type = asString(event.type, "");
if (type === "system" && asString(event.subtype, "") === "init") {
sessionId = asString(event.session_id, sessionId ?? "") || sessionId;
model = asString(event.model, model);
continue;
}
if (type === "assistant") {
sessionId = asString(event.session_id, sessionId ?? "") || sessionId;
const message = parseObject(event.message);
const content = Array.isArray(message.content) ? message.content : [];
for (const entry of content) {
if (typeof entry !== "object" || entry === null || Array.isArray(entry)) continue;
const block = entry as Record<string, unknown>;
if (asString(block.type, "") === "text") {
const text = asString(block.text, "");
if (text) assistantTexts.push(text);
}
}
continue;
}
if (type === "result") {
finalResult = event;
sessionId = asString(event.session_id, sessionId ?? "") || sessionId;
}
}
if (!finalResult) {
return {
sessionId,
model,
costUsd: null as number | null,
usage: null as UsageSummary | null,
summary: assistantTexts.join("\n\n").trim(),
resultJson: null as Record<string, unknown> | null,
};
}
const usageObj = parseObject(finalResult.usage);
const usage: UsageSummary = {
inputTokens: asNumber(usageObj.input_tokens, 0),
cachedInputTokens: asNumber(usageObj.cache_read_input_tokens, 0),
outputTokens: asNumber(usageObj.output_tokens, 0),
};
const costRaw = finalResult.total_cost_usd;
const costUsd = typeof costRaw === "number" && Number.isFinite(costRaw) ? costRaw : null;
const summary = asString(finalResult.result, assistantTexts.join("\n\n")).trim();
return {
sessionId,
model,
costUsd,
usage,
summary,
resultJson: finalResult,
};
}
function redactEnvForLogs(env: Record<string, string>): Record<string, string> {
const redacted: Record<string, string> = {};
for (const [key, value] of Object.entries(env)) {
redacted[key] = SENSITIVE_ENV_KEY.test(key) ? "***REDACTED***" : value;
}
return redacted;
}
async function runChildProcess(
runId: string,
command: string,
args: string[],
opts: {
cwd: string;
env: Record<string, string>;
timeoutSec: number;
graceSec: number;
onLog: (stream: "stdout" | "stderr", chunk: string) => Promise<void>;
},
): Promise<RunProcessResult> {
return new Promise<RunProcessResult>((resolve, reject) => {
const mergedEnv = ensurePathInEnv({ ...process.env, ...opts.env });
const child = spawn(command, args, {
cwd: opts.cwd,
env: mergedEnv,
shell: false,
stdio: ["ignore", "pipe", "pipe"],
});
runningProcesses.set(runId, { child, graceSec: opts.graceSec });
let timedOut = false;
let stdout = "";
let stderr = "";
let logChain: Promise<void> = Promise.resolve();
const timeout = setTimeout(() => {
timedOut = true;
child.kill("SIGTERM");
setTimeout(() => {
if (!child.killed) {
child.kill("SIGKILL");
}
}, Math.max(1, opts.graceSec) * 1000);
}, Math.max(1, opts.timeoutSec) * 1000);
child.stdout?.on("data", (chunk) => {
const text = String(chunk);
stdout = appendWithCap(stdout, text);
logChain = logChain
.then(() => opts.onLog("stdout", text))
.catch((err) => logger.warn({ err, runId }, "failed to append stdout log chunk"));
});
child.stderr?.on("data", (chunk) => {
const text = String(chunk);
stderr = appendWithCap(stderr, text);
logChain = logChain
.then(() => opts.onLog("stderr", text))
.catch((err) => logger.warn({ err, runId }, "failed to append stderr log chunk"));
});
child.on("error", (err) => {
clearTimeout(timeout);
runningProcesses.delete(runId);
const errno = (err as NodeJS.ErrnoException).code;
const pathValue = mergedEnv.PATH ?? mergedEnv.Path ?? "";
const msg =
errno === "ENOENT"
? `Failed to start command "${command}" in "${opts.cwd}". Verify adapter command, working directory, and PATH (${pathValue}).`
: `Failed to start command "${command}" in "${opts.cwd}": ${err.message}`;
reject(new Error(msg));
});
child.on("close", (code, signal) => {
clearTimeout(timeout);
runningProcesses.delete(runId);
void logChain.finally(() => {
resolve({
exitCode: code,
signal,
timedOut,
stdout,
stderr,
});
});
});
});
}
function buildPaperclipEnv(agent: { id: string; companyId: string }): Record<string, string> {
const vars: Record<string, string> = {
PAPERCLIP_AGENT_ID: agent.id,
PAPERCLIP_COMPANY_ID: agent.companyId,
};
const apiUrl = process.env.PAPERCLIP_API_URL ?? `http://localhost:${process.env.PORT ?? 3100}`;
vars.PAPERCLIP_API_URL = apiUrl;
return vars;
}
function defaultPathForPlatform() {
if (process.platform === "win32") {
return "C:\\Windows\\System32;C:\\Windows;C:\\Windows\\System32\\Wbem";
}
return "/usr/local/bin:/opt/homebrew/bin:/usr/local/sbin:/usr/bin:/bin:/usr/sbin:/sbin";
}
function ensurePathInEnv(env: NodeJS.ProcessEnv): NodeJS.ProcessEnv {
if (typeof env.PATH === "string" && env.PATH.length > 0) return env;
if (typeof env.Path === "string" && env.Path.length > 0) return env;
return { ...env, PATH: defaultPathForPlatform() };
}
async function ensureAbsoluteDirectory(cwd: string) {
if (!path.isAbsolute(cwd)) {
throw new Error(`Working directory must be an absolute path: "${cwd}"`);
}
let stats;
try {
stats = await fs.stat(cwd);
} catch {
throw new Error(`Working directory does not exist: "${cwd}"`);
}
if (!stats.isDirectory()) {
throw new Error(`Working directory is not a directory: "${cwd}"`);
}
}
async function ensureCommandResolvable(command: string, cwd: string, env: NodeJS.ProcessEnv) {
const hasPathSeparator = command.includes("/") || command.includes("\\");
if (hasPathSeparator) {
const absolute = path.isAbsolute(command) ? command : path.resolve(cwd, command);
try {
await fs.access(absolute, fsConstants.X_OK);
} catch {
throw new Error(`Command is not executable: "${command}" (resolved: "${absolute}")`);
}
return;
}
const pathValue = env.PATH ?? env.Path ?? "";
const delimiter = process.platform === "win32" ? ";" : ":";
const dirs = pathValue.split(delimiter).filter(Boolean);
const windowsExt = process.platform === "win32"
? (env.PATHEXT ?? ".EXE;.CMD;.BAT;.COM").split(";")
: [""];
for (const dir of dirs) {
for (const ext of windowsExt) {
const candidate = path.join(dir, process.platform === "win32" ? `${command}${ext}` : command);
try {
await fs.access(candidate, fsConstants.X_OK);
return;
} catch {
// continue scanning PATH
}
}
}
throw new Error(`Command not found in PATH: "${command}"`);
}
export function heartbeatService(db: Db) {
const runLogStore = getRunLogStore();
async function getAgent(agentId: string) {
return db
.select()
.from(agents)
.where(eq(agents.id, agentId))
.then((rows) => rows[0] ?? null);
}
async function getRun(runId: string) {
return db
.select()
.from(heartbeatRuns)
.where(eq(heartbeatRuns.id, runId))
.then((rows) => rows[0] ?? null);
}
async function getRuntimeState(agentId: string) {
return db
.select()
.from(agentRuntimeState)
.where(eq(agentRuntimeState.agentId, agentId))
.then((rows) => rows[0] ?? null);
}
async function ensureRuntimeState(agent: typeof agents.$inferSelect) {
const existing = await getRuntimeState(agent.id);
if (existing) return existing;
return db
.insert(agentRuntimeState)
.values({
agentId: agent.id,
companyId: agent.companyId,
adapterType: agent.adapterType,
stateJson: {},
})
.returning()
.then((rows) => rows[0]);
}
async function setRunStatus(
runId: string,
status: string,
patch?: Partial<typeof heartbeatRuns.$inferInsert>,
) {
const updated = await db
.update(heartbeatRuns)
.set({ status, ...patch, updatedAt: new Date() })
.where(eq(heartbeatRuns.id, runId))
.returning()
.then((rows) => rows[0] ?? null);
if (updated) {
publishLiveEvent({
companyId: updated.companyId,
type: "heartbeat.run.status",
payload: {
runId: updated.id,
agentId: updated.agentId,
status: updated.status,
invocationSource: updated.invocationSource,
triggerDetail: updated.triggerDetail,
error: updated.error ?? null,
errorCode: updated.errorCode ?? null,
startedAt: updated.startedAt ? new Date(updated.startedAt).toISOString() : null,
finishedAt: updated.finishedAt ? new Date(updated.finishedAt).toISOString() : null,
},
});
}
return updated;
}
async function setWakeupStatus(
wakeupRequestId: string | null | undefined,
status: string,
patch?: Partial<typeof agentWakeupRequests.$inferInsert>,
) {
if (!wakeupRequestId) return;
await db
.update(agentWakeupRequests)
.set({ status, ...patch, updatedAt: new Date() })
.where(eq(agentWakeupRequests.id, wakeupRequestId));
}
async function appendRunEvent(
run: typeof heartbeatRuns.$inferSelect,
seq: number,
event: {
eventType: string;
stream?: "system" | "stdout" | "stderr";
level?: "info" | "warn" | "error";
color?: string;
message?: string;
payload?: Record<string, unknown>;
},
) {
await db.insert(heartbeatRunEvents).values({
companyId: run.companyId,
runId: run.id,
agentId: run.agentId,
seq,
eventType: event.eventType,
stream: event.stream,
level: event.level,
color: event.color,
message: event.message,
payload: event.payload,
});
publishLiveEvent({
companyId: run.companyId,
type: "heartbeat.run.event",
payload: {
runId: run.id,
agentId: run.agentId,
seq,
eventType: event.eventType,
stream: event.stream ?? null,
level: event.level ?? null,
color: event.color ?? null,
message: event.message ?? null,
payload: event.payload ?? null,
},
});
}
function parseHeartbeatPolicy(agent: typeof agents.$inferSelect) {
const runtimeConfig = parseObject(agent.runtimeConfig);
const heartbeat = parseObject(runtimeConfig.heartbeat);
return {
enabled: asBoolean(heartbeat.enabled, true),
intervalSec: Math.max(0, asNumber(heartbeat.intervalSec, 0)),
wakeOnAssignment: asBoolean(heartbeat.wakeOnAssignment, true),
wakeOnOnDemand: asBoolean(heartbeat.wakeOnOnDemand, true),
wakeOnAutomation: asBoolean(heartbeat.wakeOnAutomation, true),
};
}
async function finalizeAgentStatus(
agentId: string,
outcome: "succeeded" | "failed" | "cancelled" | "timed_out",
) {
const existing = await getAgent(agentId);
if (!existing) return;
if (existing.status === "paused" || existing.status === "terminated") {
return;
}
const nextStatus =
outcome === "succeeded" ? "idle" : outcome === "cancelled" ? "idle" : "error";
const updated = await db
.update(agents)
.set({
status: nextStatus,
lastHeartbeatAt: new Date(),
updatedAt: new Date(),
})
.where(eq(agents.id, agentId))
.returning()
.then((rows) => rows[0] ?? null);
if (updated) {
publishLiveEvent({
companyId: updated.companyId,
type: "agent.status",
payload: {
agentId: updated.id,
status: updated.status,
lastHeartbeatAt: updated.lastHeartbeatAt
? new Date(updated.lastHeartbeatAt).toISOString()
: null,
outcome,
},
});
}
}
async function updateRuntimeState(
agent: typeof agents.$inferSelect,
run: typeof heartbeatRuns.$inferSelect,
result: AdapterExecutionResult,
) {
const existing = await ensureRuntimeState(agent);
const usage = result.usage;
const inputTokens = usage?.inputTokens ?? 0;
const outputTokens = usage?.outputTokens ?? 0;
const cachedInputTokens = usage?.cachedInputTokens ?? 0;
const additionalCostCents = Math.max(0, Math.round((result.costUsd ?? 0) * 100));
await db
.update(agentRuntimeState)
.set({
adapterType: agent.adapterType,
sessionId: result.clearSession ? null : (result.sessionId ?? existing.sessionId),
lastRunId: run.id,
lastRunStatus: run.status,
lastError: result.errorMessage ?? null,
totalInputTokens: existing.totalInputTokens + inputTokens,
totalOutputTokens: existing.totalOutputTokens + outputTokens,
totalCachedInputTokens: existing.totalCachedInputTokens + cachedInputTokens,
totalCostCents: existing.totalCostCents + additionalCostCents,
updatedAt: new Date(),
})
.where(eq(agentRuntimeState.agentId, agent.id));
if (additionalCostCents > 0) {
await db.insert(costEvents).values({
companyId: agent.companyId,
agentId: agent.id,
provider: result.provider ?? "unknown",
model: result.model ?? "unknown",
inputTokens,
outputTokens,
costCents: additionalCostCents,
occurredAt: new Date(),
});
await db
.update(agents)
.set({
spentMonthlyCents: sql`${agents.spentMonthlyCents} + ${additionalCostCents}`,
updatedAt: new Date(),
})
.where(eq(agents.id, agent.id));
}
}
async function executeHttpRun(
runId: string,
agentId: string,
config: Record<string, unknown>,
context: Record<string, unknown>,
): Promise<AdapterExecutionResult> {
const url = asString(config.url, "");
if (!url) throw new Error("HTTP adapter missing url");
const method = asString(config.method, "POST");
const timeoutMs = asNumber(config.timeoutMs, 15000);
const headers = parseObject(config.headers) as Record<string, string>;
const payloadTemplate = parseObject(config.payloadTemplate);
const body = { ...payloadTemplate, agentId, runId, context };
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), timeoutMs);
try {
const res = await fetch(url, {
method,
headers: {
"content-type": "application/json",
...headers,
},
body: JSON.stringify(body),
signal: controller.signal,
});
if (!res.ok) {
throw new Error(`HTTP invoke failed with status ${res.status}`);
}
return {
exitCode: 0,
signal: null,
timedOut: false,
summary: `HTTP ${method} ${url}`,
};
} finally {
clearTimeout(timer);
}
}
async function executeProcessRun(
runId: string,
agent: typeof agents.$inferSelect,
config: Record<string, unknown>,
onLog: (stream: "stdout" | "stderr", chunk: string) => Promise<void>,
onMeta?: (meta: AdapterInvocationMeta) => Promise<void>,
): Promise<AdapterExecutionResult> {
const command = asString(config.command, "");
if (!command) throw new Error("Process adapter missing command");
const args = asStringArray(config.args);
const cwd = asString(config.cwd, process.cwd());
const envConfig = parseObject(config.env);
const env: Record<string, string> = { ...buildPaperclipEnv(agent) };
for (const [k, v] of Object.entries(envConfig)) {
if (typeof v === "string") env[k] = v;
}
const timeoutSec = asNumber(config.timeoutSec, 900);
const graceSec = asNumber(config.graceSec, 15);
if (onMeta) {
await onMeta({
adapterType: "process",
command,
cwd,
commandArgs: args,
env: redactEnvForLogs(env),
});
}
const proc = await runChildProcess(runId, command, args, {
cwd,
env,
timeoutSec,
graceSec,
onLog,
});
if (proc.timedOut) {
return {
exitCode: proc.exitCode,
signal: proc.signal,
timedOut: true,
errorMessage: `Timed out after ${timeoutSec}s`,
};
}
if ((proc.exitCode ?? 0) !== 0) {
return {
exitCode: proc.exitCode,
signal: proc.signal,
timedOut: false,
errorMessage: `Process exited with code ${proc.exitCode ?? -1}`,
resultJson: {
stdout: proc.stdout,
stderr: proc.stderr,
},
};
}
return {
exitCode: proc.exitCode,
signal: proc.signal,
timedOut: false,
resultJson: {
stdout: proc.stdout,
stderr: proc.stderr,
},
};
}
async function executeClaudeLocalRun(
runId: string,
agent: typeof agents.$inferSelect,
runtime: typeof agentRuntimeState.$inferSelect,
config: Record<string, unknown>,
context: Record<string, unknown>,
onLog: (stream: "stdout" | "stderr", chunk: string) => Promise<void>,
onMeta?: (meta: AdapterInvocationMeta) => Promise<void>,
): Promise<AdapterExecutionResult> {
const promptTemplate = asString(
config.promptTemplate,
"You are agent {{agent.id}} ({{agent.name}}). Continue your Paperclip work.",
);
const bootstrapTemplate = asString(config.bootstrapPromptTemplate, promptTemplate);
const command = asString(config.command, "claude");
const model = asString(config.model, "");
const maxTurns = asNumber(config.maxTurnsPerRun, 0);
const dangerouslySkipPermissions = asBoolean(config.dangerouslySkipPermissions, false);
const cwd = asString(config.cwd, process.cwd());
await ensureAbsoluteDirectory(cwd);
const envConfig = parseObject(config.env);
const env: Record<string, string> = { ...buildPaperclipEnv(agent) };
for (const [k, v] of Object.entries(envConfig)) {
if (typeof v === "string") env[k] = v;
}
const runtimeEnv = ensurePathInEnv({ ...process.env, ...env });
await ensureCommandResolvable(command, cwd, runtimeEnv);
const timeoutSec = asNumber(config.timeoutSec, 1800);
const graceSec = asNumber(config.graceSec, 20);
const extraArgs = (() => {
const fromExtraArgs = asStringArray(config.extraArgs);
if (fromExtraArgs.length > 0) return fromExtraArgs;
return asStringArray(config.args);
})();
const sessionId = runtime.sessionId;
const template = sessionId ? promptTemplate : bootstrapTemplate;
const prompt = renderTemplate(template, {
company: { id: agent.companyId },
agent,
run: { id: runId, source: "on_demand" },
context,
});
const buildClaudeArgs = (resumeSessionId: string | null) => {
const args = ["--print", prompt, "--output-format", "stream-json", "--verbose"];
if (resumeSessionId) args.push("--resume", resumeSessionId);
if (dangerouslySkipPermissions) args.push("--dangerously-skip-permissions");
if (model) args.push("--model", model);
if (maxTurns > 0) args.push("--max-turns", String(maxTurns));
if (extraArgs.length > 0) args.push(...extraArgs);
return args;
};
const parseFallbackErrorMessage = (proc: RunProcessResult) => {
const stderrLine =
proc.stderr
.split(/\r?\n/)
.map((line) => line.trim())
.find(Boolean) ?? "";
if ((proc.exitCode ?? 0) === 0) {
return "Failed to parse claude JSON output";
}
return stderrLine
? `Claude exited with code ${proc.exitCode ?? -1}: ${stderrLine}`
: `Claude exited with code ${proc.exitCode ?? -1}`;
};
const runAttempt = async (resumeSessionId: string | null) => {
const args = buildClaudeArgs(resumeSessionId);
if (onMeta) {
await onMeta({
adapterType: "claude_local",
command,
cwd,
commandArgs: args.map((value, idx) => (idx === 1 ? `<prompt ${prompt.length} chars>` : value)),
env: redactEnvForLogs(env),
prompt,
context,
});
}
const proc = await runChildProcess(runId, command, args, {
cwd,
env,
timeoutSec,
graceSec,
onLog,
});
const parsedStream = parseClaudeStreamJson(proc.stdout);
const parsed = parsedStream.resultJson ?? parseJson(proc.stdout);
return { proc, parsedStream, parsed };
};
const toAdapterResult = (
attempt: {
proc: RunProcessResult;
parsedStream: ReturnType<typeof parseClaudeStreamJson>;
parsed: Record<string, unknown> | null;
},
opts: { fallbackSessionId: string | null; clearSessionOnMissingSession?: boolean },
): AdapterExecutionResult => {
const { proc, parsedStream, parsed } = attempt;
if (proc.timedOut) {
return {
exitCode: proc.exitCode,
signal: proc.signal,
timedOut: true,
errorMessage: `Timed out after ${timeoutSec}s`,
clearSession: Boolean(opts.clearSessionOnMissingSession),
};
}
if (!parsed) {
return {
exitCode: proc.exitCode,
signal: proc.signal,
timedOut: false,
errorMessage: parseFallbackErrorMessage(proc),
resultJson: {
stdout: proc.stdout,
stderr: proc.stderr,
},
clearSession: Boolean(opts.clearSessionOnMissingSession),
};
}
const usage =
parsedStream.usage ??
(() => {
const usageObj = parseObject(parsed.usage);
return {
inputTokens: asNumber(usageObj.input_tokens, 0),
cachedInputTokens: asNumber(usageObj.cache_read_input_tokens, 0),
outputTokens: asNumber(usageObj.output_tokens, 0),
};
})();
const resolvedSessionId =
parsedStream.sessionId ??
(asString(parsed.session_id, opts.fallbackSessionId ?? "") || opts.fallbackSessionId);
return {
exitCode: proc.exitCode,
signal: proc.signal,
timedOut: false,
errorMessage:
(proc.exitCode ?? 0) === 0
? null
: describeClaudeFailure(parsed) ?? `Claude exited with code ${proc.exitCode ?? -1}`,
usage,
sessionId: resolvedSessionId,
provider: "anthropic",
model: parsedStream.model || asString(parsed.model, model),
costUsd: parsedStream.costUsd ?? asNumber(parsed.total_cost_usd, 0),
resultJson: parsed,
summary: parsedStream.summary || asString(parsed.result, ""),
clearSession: Boolean(opts.clearSessionOnMissingSession && !resolvedSessionId),
};
};
const initial = await runAttempt(sessionId ?? null);
if (
sessionId &&
!initial.proc.timedOut &&
(initial.proc.exitCode ?? 0) !== 0 &&
initial.parsed &&
isClaudeUnknownSessionError(initial.parsed)
) {
await onLog(
"stderr",
`[paperclip] Claude resume session "${sessionId}" is unavailable; retrying with a fresh session.\n`,
);
const retry = await runAttempt(null);
return toAdapterResult(retry, { fallbackSessionId: null, clearSessionOnMissingSession: true });
}
return toAdapterResult(initial, { fallbackSessionId: runtime.sessionId });
}
async function executeCodexLocalRun(
runId: string,
agent: typeof agents.$inferSelect,
runtime: typeof agentRuntimeState.$inferSelect,
config: Record<string, unknown>,
context: Record<string, unknown>,
onLog: (stream: "stdout" | "stderr", chunk: string) => Promise<void>,
onMeta?: (meta: AdapterInvocationMeta) => Promise<void>,
): Promise<AdapterExecutionResult> {
const promptTemplate = asString(
config.promptTemplate,
"You are agent {{agent.id}} ({{agent.name}}). Continue your Paperclip work.",
);
const bootstrapTemplate = asString(config.bootstrapPromptTemplate, promptTemplate);
const command = asString(config.command, "codex");
const model = asString(config.model, "");
const search = asBoolean(config.search, false);
const bypass = asBoolean(config.dangerouslyBypassApprovalsAndSandbox, false);
const cwd = asString(config.cwd, process.cwd());
await ensureAbsoluteDirectory(cwd);
const envConfig = parseObject(config.env);
const env: Record<string, string> = { ...buildPaperclipEnv(agent) };
for (const [k, v] of Object.entries(envConfig)) {
if (typeof v === "string") env[k] = v;
}
const runtimeEnv = ensurePathInEnv({ ...process.env, ...env });
await ensureCommandResolvable(command, cwd, runtimeEnv);
const timeoutSec = asNumber(config.timeoutSec, 1800);
const graceSec = asNumber(config.graceSec, 20);
const extraArgs = (() => {
const fromExtraArgs = asStringArray(config.extraArgs);
if (fromExtraArgs.length > 0) return fromExtraArgs;
return asStringArray(config.args);
})();
const sessionId = runtime.sessionId;
const template = sessionId ? promptTemplate : bootstrapTemplate;
const prompt = renderTemplate(template, {
company: { id: agent.companyId },
agent,
run: { id: runId, source: "on_demand" },
context,
});
const args = ["exec", "--json"];
if (search) args.unshift("--search");
if (bypass) args.push("--dangerously-bypass-approvals-and-sandbox");
if (model) args.push("--model", model);
if (extraArgs.length > 0) args.push(...extraArgs);
if (sessionId) args.push("resume", sessionId, prompt);
else args.push(prompt);
if (onMeta) {
await onMeta({
adapterType: "codex_local",
command,
cwd,
commandArgs: args.map((value, idx) => {
if (!sessionId && idx === args.length - 1) return `<prompt ${prompt.length} chars>`;
if (sessionId && idx === args.length - 1) return `<prompt ${prompt.length} chars>`;
return value;
}),
env: redactEnvForLogs(env),
prompt,
context,
});
}
const proc = await runChildProcess(runId, command, args, {
cwd,
env,
timeoutSec,
graceSec,
onLog,
});
if (proc.timedOut) {
return {
exitCode: proc.exitCode,
signal: proc.signal,
timedOut: true,
errorMessage: `Timed out after ${timeoutSec}s`,
};
}
const parsed = parseCodexJsonl(proc.stdout);
return {
exitCode: proc.exitCode,
signal: proc.signal,
timedOut: false,
errorMessage: (proc.exitCode ?? 0) === 0 ? null : `Codex exited with code ${proc.exitCode ?? -1}`,
usage: parsed.usage,
sessionId: parsed.sessionId ?? runtime.sessionId,
provider: "openai",
model,
costUsd: null,
resultJson: {
stdout: proc.stdout,
stderr: proc.stderr,
},
summary: parsed.summary,
};
}
async function executeRun(runId: string) {
const run = await getRun(runId);
if (!run) return;
if (run.status !== "queued" && run.status !== "running") return;
const agent = await getAgent(run.agentId);
if (!agent) {
await setRunStatus(runId, "failed", {
error: "Agent not found",
errorCode: "agent_not_found",
finishedAt: new Date(),
});
await setWakeupStatus(run.wakeupRequestId, "failed", {
finishedAt: new Date(),
error: "Agent not found",
});
return;
}
const runtime = await ensureRuntimeState(agent);
let seq = 1;
let handle: RunLogHandle | null = null;
let stdoutExcerpt = "";
let stderrExcerpt = "";
try {
await setRunStatus(runId, "running", {
startedAt: new Date(),
sessionIdBefore: runtime.sessionId,
});
await setWakeupStatus(run.wakeupRequestId, "claimed", { claimedAt: new Date() });
const runningAgent = await db
.update(agents)
.set({ status: "running", updatedAt: new Date() })
.where(eq(agents.id, agent.id))
.returning()
.then((rows) => rows[0] ?? null);
if (runningAgent) {
publishLiveEvent({
companyId: runningAgent.companyId,
type: "agent.status",
payload: {
agentId: runningAgent.id,
status: runningAgent.status,
outcome: "running",
},
});
}
const currentRun = (await getRun(runId)) ?? run;
await appendRunEvent(currentRun, seq++, {
eventType: "lifecycle",
stream: "system",
level: "info",
message: "run started",
});
handle = await runLogStore.begin({
companyId: run.companyId,
agentId: run.agentId,
runId,
});
await db
.update(heartbeatRuns)
.set({
logStore: handle.store,
logRef: handle.logRef,
updatedAt: new Date(),
})
.where(eq(heartbeatRuns.id, runId));
const onLog = async (stream: "stdout" | "stderr", chunk: string) => {
if (stream === "stdout") stdoutExcerpt = appendExcerpt(stdoutExcerpt, chunk);
if (stream === "stderr") stderrExcerpt = appendExcerpt(stderrExcerpt, chunk);
if (handle) {
await runLogStore.append(handle, {
stream,
chunk,
ts: new Date().toISOString(),
});
}
const payloadChunk =
chunk.length > MAX_LIVE_LOG_CHUNK_BYTES
? chunk.slice(chunk.length - MAX_LIVE_LOG_CHUNK_BYTES)
: chunk;
publishLiveEvent({
companyId: run.companyId,
type: "heartbeat.run.log",
payload: {
runId: run.id,
agentId: run.agentId,
stream,
chunk: payloadChunk,
truncated: payloadChunk.length !== chunk.length,
},
});
};
const config = parseObject(agent.adapterConfig);
const context = (run.contextSnapshot ?? {}) as Record<string, unknown>;
const onAdapterMeta = async (meta: AdapterInvocationMeta) => {
await appendRunEvent(currentRun, seq++, {
eventType: "adapter.invoke",
stream: "system",
level: "info",
message: "adapter invocation",
payload: meta as Record<string, unknown>,
});
};
let adapterResult: AdapterExecutionResult;
if (agent.adapterType === "http") {
adapterResult = await executeHttpRun(run.id, agent.id, config, context);
} else if (agent.adapterType === "claude_local") {
adapterResult = await executeClaudeLocalRun(run.id, agent, runtime, config, context, onLog, onAdapterMeta);
} else if (agent.adapterType === "codex_local") {
adapterResult = await executeCodexLocalRun(run.id, agent, runtime, config, context, onLog, onAdapterMeta);
} else {
adapterResult = await executeProcessRun(run.id, agent, config, onLog, onAdapterMeta);
}
let outcome: "succeeded" | "failed" | "cancelled" | "timed_out";
const latestRun = await getRun(run.id);
if (latestRun?.status === "cancelled") {
outcome = "cancelled";
} else if (adapterResult.timedOut) {
outcome = "timed_out";
} else if ((adapterResult.exitCode ?? 0) === 0 && !adapterResult.errorMessage) {
outcome = "succeeded";
} else {
outcome = "failed";
}
let logSummary: { bytes: number; sha256?: string; compressed: boolean } | null = null;
if (handle) {
logSummary = await runLogStore.finalize(handle);
}
const status =
outcome === "succeeded"
? "succeeded"
: outcome === "cancelled"
? "cancelled"
: outcome === "timed_out"
? "timed_out"
: "failed";
const usageJson =
adapterResult.usage || adapterResult.costUsd != null
? ({
...(adapterResult.usage ?? {}),
...(adapterResult.costUsd != null ? { costUsd: adapterResult.costUsd } : {}),
} as Record<string, unknown>)
: null;
await setRunStatus(run.id, status, {
finishedAt: new Date(),
error:
outcome === "succeeded"
? null
: adapterResult.errorMessage ?? (outcome === "timed_out" ? "Timed out" : "Adapter failed"),
errorCode:
outcome === "timed_out"
? "timeout"
: outcome === "cancelled"
? "cancelled"
: outcome === "failed"
? "adapter_failed"
: null,
exitCode: adapterResult.exitCode,
signal: adapterResult.signal,
usageJson,
resultJson: adapterResult.resultJson ?? null,
sessionIdAfter: adapterResult.sessionId ?? runtime.sessionId,
stdoutExcerpt,
stderrExcerpt,
logBytes: logSummary?.bytes,
logSha256: logSummary?.sha256,
logCompressed: logSummary?.compressed ?? false,
});
await setWakeupStatus(run.wakeupRequestId, outcome === "succeeded" ? "completed" : status, {
finishedAt: new Date(),
error: adapterResult.errorMessage ?? null,
});
const finalizedRun = await getRun(run.id);
if (finalizedRun) {
await appendRunEvent(finalizedRun, seq++, {
eventType: "lifecycle",
stream: "system",
level: outcome === "succeeded" ? "info" : "error",
message: `run ${outcome}`,
payload: {
status,
exitCode: adapterResult.exitCode,
},
});
}
if (finalizedRun) {
await updateRuntimeState(agent, finalizedRun, adapterResult);
}
await finalizeAgentStatus(agent.id, outcome);
} catch (err) {
const message = err instanceof Error ? err.message : "Unknown adapter failure";
logger.error({ err, runId }, "heartbeat execution failed");
let logSummary: { bytes: number; sha256?: string; compressed: boolean } | null = null;
if (handle) {
try {
logSummary = await runLogStore.finalize(handle);
} catch (finalizeErr) {
logger.warn({ err: finalizeErr, runId }, "failed to finalize run log after error");
}
}
const failedRun = await setRunStatus(run.id, "failed", {
error: message,
errorCode: "adapter_failed",
finishedAt: new Date(),
stdoutExcerpt,
stderrExcerpt,
logBytes: logSummary?.bytes,
logSha256: logSummary?.sha256,
logCompressed: logSummary?.compressed ?? false,
});
await setWakeupStatus(run.wakeupRequestId, "failed", {
finishedAt: new Date(),
error: message,
});
if (failedRun) {
await appendRunEvent(failedRun, seq++, {
eventType: "error",
stream: "system",
level: "error",
message,
});
await updateRuntimeState(agent, failedRun, {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: message,
});
}
await finalizeAgentStatus(agent.id, "failed");
}
}
async function enqueueWakeup(agentId: string, opts: WakeupOptions = {}) {
const source = opts.source ?? "on_demand";
const triggerDetail = opts.triggerDetail ?? null;
const contextSnapshot = opts.contextSnapshot ?? {};
const agent = await getAgent(agentId);
if (!agent) throw notFound("Agent not found");
if (agent.status === "paused" || agent.status === "terminated") {
throw conflict("Agent is not invokable in its current state", { status: agent.status });
}
const policy = parseHeartbeatPolicy(agent);
const writeSkippedRequest = async (reason: string) => {
await db.insert(agentWakeupRequests).values({
companyId: agent.companyId,
agentId,
source,
triggerDetail,
reason,
payload: opts.payload ?? null,
status: "skipped",
requestedByActorType: opts.requestedByActorType ?? null,
requestedByActorId: opts.requestedByActorId ?? null,
idempotencyKey: opts.idempotencyKey ?? null,
finishedAt: new Date(),
});
};
if (source === "timer" && !policy.enabled) {
await writeSkippedRequest("heartbeat.disabled");
return null;
}
if (source === "assignment" && !policy.wakeOnAssignment) {
await writeSkippedRequest("heartbeat.wakeOnAssignment.disabled");
return null;
}
if (source === "automation" && !policy.wakeOnAutomation) {
await writeSkippedRequest("heartbeat.wakeOnAutomation.disabled");
return null;
}
if (source === "on_demand" && triggerDetail === "ping" && !policy.wakeOnOnDemand) {
await writeSkippedRequest("heartbeat.wakeOnOnDemand.disabled");
return null;
}
const activeRun = await db
.select()
.from(heartbeatRuns)
.where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"])))
.orderBy(desc(heartbeatRuns.createdAt))
.then((rows) => rows[0] ?? null);
if (activeRun) {
await db.insert(agentWakeupRequests).values({
companyId: agent.companyId,
agentId,
source,
triggerDetail,
reason: opts.reason ?? null,
payload: opts.payload ?? null,
status: "coalesced",
coalescedCount: 1,
requestedByActorType: opts.requestedByActorType ?? null,
requestedByActorId: opts.requestedByActorId ?? null,
idempotencyKey: opts.idempotencyKey ?? null,
runId: activeRun.id,
finishedAt: new Date(),
});
return activeRun;
}
const wakeupRequest = await db
.insert(agentWakeupRequests)
.values({
companyId: agent.companyId,
agentId,
source,
triggerDetail,
reason: opts.reason ?? null,
payload: opts.payload ?? null,
status: "queued",
requestedByActorType: opts.requestedByActorType ?? null,
requestedByActorId: opts.requestedByActorId ?? null,
idempotencyKey: opts.idempotencyKey ?? null,
})
.returning()
.then((rows) => rows[0]);
const runtime = await getRuntimeState(agent.id);
const run = await db
.insert(heartbeatRuns)
.values({
companyId: agent.companyId,
agentId,
invocationSource: source,
triggerDetail,
status: "queued",
wakeupRequestId: wakeupRequest.id,
contextSnapshot,
sessionIdBefore: runtime?.sessionId ?? null,
})
.returning()
.then((rows) => rows[0]);
await db
.update(agentWakeupRequests)
.set({
runId: run.id,
updatedAt: new Date(),
})
.where(eq(agentWakeupRequests.id, wakeupRequest.id));
publishLiveEvent({
companyId: run.companyId,
type: "heartbeat.run.queued",
payload: {
runId: run.id,
agentId: run.agentId,
invocationSource: run.invocationSource,
triggerDetail: run.triggerDetail,
wakeupRequestId: run.wakeupRequestId,
},
});
void executeRun(run.id).catch((err) => {
logger.error({ err, runId: run.id }, "heartbeat execution failed");
});
return run;
}
return {
list: (companyId: string, agentId?: string) => {
if (!agentId) {
return db
.select()
.from(heartbeatRuns)
.where(eq(heartbeatRuns.companyId, companyId))
.orderBy(desc(heartbeatRuns.createdAt));
}
return db
.select()
.from(heartbeatRuns)
.where(and(eq(heartbeatRuns.companyId, companyId), eq(heartbeatRuns.agentId, agentId)))
.orderBy(desc(heartbeatRuns.createdAt));
},
getRun,
getRuntimeState: async (agentId: string) => {
const state = await getRuntimeState(agentId);
if (state) return state;
const agent = await getAgent(agentId);
if (!agent) return null;
return ensureRuntimeState(agent);
},
resetRuntimeSession: async (agentId: string) => {
const agent = await getAgent(agentId);
if (!agent) throw notFound("Agent not found");
await ensureRuntimeState(agent);
return db
.update(agentRuntimeState)
.set({
sessionId: null,
stateJson: {},
lastError: null,
updatedAt: new Date(),
})
.where(eq(agentRuntimeState.agentId, agentId))
.returning()
.then((rows) => rows[0] ?? null);
},
listEvents: (runId: string, afterSeq = 0, limit = 200) =>
db
.select()
.from(heartbeatRunEvents)
.where(and(eq(heartbeatRunEvents.runId, runId), gt(heartbeatRunEvents.seq, afterSeq)))
.orderBy(asc(heartbeatRunEvents.seq))
.limit(Math.max(1, Math.min(limit, 1000))),
readLog: async (runId: string, opts?: { offset?: number; limitBytes?: number }) => {
const run = await getRun(runId);
if (!run) throw notFound("Heartbeat run not found");
if (!run.logStore || !run.logRef) throw notFound("Run log not found");
const result = await runLogStore.read(
{
store: run.logStore as "local_file",
logRef: run.logRef,
},
opts,
);
return {
runId,
store: run.logStore,
logRef: run.logRef,
...result,
};
},
invoke: async (
agentId: string,
source: "timer" | "assignment" | "on_demand" | "automation" = "on_demand",
contextSnapshot: Record<string, unknown> = {},
triggerDetail: "manual" | "ping" | "callback" | "system" = "manual",
actor?: { actorType?: "user" | "agent" | "system"; actorId?: string | null },
) =>
enqueueWakeup(agentId, {
source,
triggerDetail,
contextSnapshot,
requestedByActorType: actor?.actorType,
requestedByActorId: actor?.actorId ?? null,
}),
wakeup: enqueueWakeup,
tickTimers: async (now = new Date()) => {
const allAgents = await db.select().from(agents);
let checked = 0;
let enqueued = 0;
let skipped = 0;
for (const agent of allAgents) {
if (agent.status === "paused" || agent.status === "terminated") continue;
const policy = parseHeartbeatPolicy(agent);
if (!policy.enabled || policy.intervalSec <= 0) continue;
checked += 1;
const last = agent.lastHeartbeatAt ? new Date(agent.lastHeartbeatAt).getTime() : 0;
const elapsedMs = now.getTime() - last;
if (last && elapsedMs < policy.intervalSec * 1000) continue;
const run = await enqueueWakeup(agent.id, {
source: "timer",
triggerDetail: "system",
reason: "heartbeat_timer",
requestedByActorType: "system",
requestedByActorId: "heartbeat_scheduler",
contextSnapshot: {
source: "scheduler",
reason: "interval_elapsed",
now: now.toISOString(),
},
});
if (run) enqueued += 1;
else skipped += 1;
}
return { checked, enqueued, skipped };
},
cancelRun: async (runId: string) => {
const run = await getRun(runId);
if (!run) throw notFound("Heartbeat run not found");
if (run.status !== "running" && run.status !== "queued") return run;
const running = runningProcesses.get(run.id);
if (running) {
running.child.kill("SIGTERM");
const graceMs = Math.max(1, running.graceSec) * 1000;
setTimeout(() => {
if (!running.child.killed) {
running.child.kill("SIGKILL");
}
}, graceMs);
}
const cancelled = await setRunStatus(run.id, "cancelled", {
finishedAt: new Date(),
error: "Cancelled by control plane",
errorCode: "cancelled",
});
await setWakeupStatus(run.wakeupRequestId, "cancelled", {
finishedAt: new Date(),
error: "Cancelled by control plane",
});
if (cancelled) {
await appendRunEvent(cancelled, 1, {
eventType: "lifecycle",
stream: "system",
level: "warn",
message: "run cancelled",
});
}
runningProcesses.delete(run.id);
await finalizeAgentStatus(run.agentId, "cancelled");
return cancelled;
},
cancelActiveForAgent: async (agentId: string) => {
const runs = await db
.select()
.from(heartbeatRuns)
.where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"])));
for (const run of runs) {
await setRunStatus(run.id, "cancelled", {
finishedAt: new Date(),
error: "Cancelled due to agent pause",
errorCode: "cancelled",
});
await setWakeupStatus(run.wakeupRequestId, "cancelled", {
finishedAt: new Date(),
error: "Cancelled due to agent pause",
});
const running = runningProcesses.get(run.id);
if (running) {
running.child.kill("SIGTERM");
runningProcesses.delete(run.id);
}
}
return runs.length;
},
};
}