From eac3f3fa69df7687400f426e2a1b11a78ef6e537 Mon Sep 17 00:00:00 2001 From: dotta Date: Sat, 21 Mar 2026 17:09:38 -0500 Subject: [PATCH] Honor explicit failed-run session resume Co-Authored-By: Paperclip --- .../heartbeat-workspace-session.test.ts | 53 +++++++ server/src/services/heartbeat.ts | 141 +++++++++++++++++- 2 files changed, 186 insertions(+), 8 deletions(-) diff --git a/server/src/__tests__/heartbeat-workspace-session.test.ts b/server/src/__tests__/heartbeat-workspace-session.test.ts index 79d781e9..7fab2b42 100644 --- a/server/src/__tests__/heartbeat-workspace-session.test.ts +++ b/server/src/__tests__/heartbeat-workspace-session.test.ts @@ -1,7 +1,9 @@ import { describe, expect, it } from "vitest"; import type { agents } from "@paperclipai/db"; +import { sessionCodec as codexSessionCodec } from "@paperclipai/adapter-codex-local/server"; import { resolveDefaultAgentWorkspaceDir } from "../home-paths.js"; import { + buildExplicitResumeSessionOverride, formatRuntimeWorkspaceWarningLog, prioritizeProjectWorkspaceCandidatesForRun, parseSessionCompactionPolicy, @@ -182,6 +184,57 @@ describe("shouldResetTaskSessionForWake", () => { }); }); +describe("buildExplicitResumeSessionOverride", () => { + it("reuses saved task session params when they belong to the selected failed run", () => { + const result = buildExplicitResumeSessionOverride({ + resumeFromRunId: "run-1", + resumeRunSessionIdBefore: "session-before", + resumeRunSessionIdAfter: "session-after", + taskSession: { + sessionParamsJson: { + sessionId: "session-after", + cwd: "/tmp/project", + }, + sessionDisplayId: "session-after", + lastRunId: "run-1", + }, + sessionCodec: codexSessionCodec, + }); + + expect(result).toEqual({ + sessionDisplayId: "session-after", + sessionParams: { + sessionId: "session-after", + cwd: "/tmp/project", + }, + }); + }); + + it("falls back to the selected run session id when no matching task session params are available", () => { + const result = buildExplicitResumeSessionOverride({ + resumeFromRunId: "run-1", + resumeRunSessionIdBefore: "session-before", + resumeRunSessionIdAfter: "session-after", + taskSession: { + sessionParamsJson: { + sessionId: "other-session", + cwd: "/tmp/project", + }, + sessionDisplayId: "other-session", + lastRunId: "run-2", + }, + sessionCodec: codexSessionCodec, + }); + + expect(result).toEqual({ + sessionDisplayId: "session-after", + sessionParams: { + sessionId: "session-after", + }, + }); + }); +}); + describe("formatRuntimeWorkspaceWarningLog", () => { it("emits informational workspace warnings on stdout", () => { expect(formatRuntimeWorkspaceWarningLog("Using fallback workspace")).toEqual({ diff --git a/server/src/services/heartbeat.ts b/server/src/services/heartbeat.ts index 96a42f7e..c909b9b7 100644 --- a/server/src/services/heartbeat.ts +++ b/server/src/services/heartbeat.ts @@ -326,6 +326,51 @@ async function resolveLedgerScopeForRun( }; } +type ResumeSessionRow = { + sessionParamsJson: Record | null; + sessionDisplayId: string | null; + lastRunId: string | null; +}; + +export function buildExplicitResumeSessionOverride(input: { + resumeFromRunId: string; + resumeRunSessionIdBefore: string | null; + resumeRunSessionIdAfter: string | null; + taskSession: ResumeSessionRow | null; + sessionCodec: AdapterSessionCodec; +}) { + const desiredDisplayId = truncateDisplayId( + input.resumeRunSessionIdAfter ?? input.resumeRunSessionIdBefore, + ); + const taskSessionParams = normalizeSessionParams( + input.sessionCodec.deserialize(input.taskSession?.sessionParamsJson ?? null), + ); + const taskSessionDisplayId = truncateDisplayId( + input.taskSession?.sessionDisplayId ?? + (input.sessionCodec.getDisplayId ? input.sessionCodec.getDisplayId(taskSessionParams) : null) ?? + readNonEmptyString(taskSessionParams?.sessionId), + ); + const canReuseTaskSessionParams = + input.taskSession != null && + ( + input.taskSession.lastRunId === input.resumeFromRunId || + (!!desiredDisplayId && taskSessionDisplayId === desiredDisplayId) + ); + const sessionParams = + canReuseTaskSessionParams + ? taskSessionParams + : desiredDisplayId + ? { sessionId: desiredDisplayId } + : null; + const sessionDisplayId = desiredDisplayId ?? (canReuseTaskSessionParams ? taskSessionDisplayId : null); + + if (!sessionDisplayId && !sessionParams) return null; + return { + sessionDisplayId, + sessionParams, + }; +} + function normalizeUsageTotals(usage: UsageSummary | null | undefined): UsageTotals | null { if (!usage) return null; return { @@ -978,6 +1023,57 @@ export function heartbeatService(db: Db) { return runtimeForRun?.sessionId ?? null; } + async function resolveExplicitResumeSessionOverride( + agent: typeof agents.$inferSelect, + payload: Record | null, + taskKey: string | null, + ) { + const resumeFromRunId = readNonEmptyString(payload?.resumeFromRunId); + if (!resumeFromRunId) return null; + + const resumeRun = await db + .select({ + id: heartbeatRuns.id, + contextSnapshot: heartbeatRuns.contextSnapshot, + sessionIdBefore: heartbeatRuns.sessionIdBefore, + sessionIdAfter: heartbeatRuns.sessionIdAfter, + }) + .from(heartbeatRuns) + .where( + and( + eq(heartbeatRuns.id, resumeFromRunId), + eq(heartbeatRuns.companyId, agent.companyId), + eq(heartbeatRuns.agentId, agent.id), + ), + ) + .then((rows) => rows[0] ?? null); + if (!resumeRun) return null; + + const resumeContext = parseObject(resumeRun.contextSnapshot); + const resumeTaskKey = deriveTaskKey(resumeContext, null) ?? taskKey; + const resumeTaskSession = resumeTaskKey + ? await getTaskSession(agent.companyId, agent.id, agent.adapterType, resumeTaskKey) + : null; + const sessionCodec = getAdapterSessionCodec(agent.adapterType); + const sessionOverride = buildExplicitResumeSessionOverride({ + resumeFromRunId, + resumeRunSessionIdBefore: resumeRun.sessionIdBefore, + resumeRunSessionIdAfter: resumeRun.sessionIdAfter, + taskSession: resumeTaskSession, + sessionCodec, + }); + if (!sessionOverride) return null; + + return { + resumeFromRunId, + taskKey: resumeTaskKey, + issueId: readNonEmptyString(resumeContext.issueId), + taskId: readNonEmptyString(resumeContext.taskId) ?? readNonEmptyString(resumeContext.issueId), + sessionDisplayId: sessionOverride.sessionDisplayId, + sessionParams: sessionOverride.sessionParams, + }; + } + async function resolveWorkspaceForRun( agent: typeof agents.$inferSelect, context: Record, @@ -1921,9 +2017,18 @@ export function heartbeatService(db: Db) { const resetTaskSession = shouldResetTaskSessionForWake(context); const sessionResetReason = describeSessionResetReason(context); const taskSessionForRun = resetTaskSession ? null : taskSession; - const previousSessionParams = normalizeSessionParams( - sessionCodec.deserialize(taskSessionForRun?.sessionParamsJson ?? null), + const explicitResumeSessionParams = normalizeSessionParams( + sessionCodec.deserialize(parseObject(context.resumeSessionParams)), ); + const explicitResumeSessionDisplayId = truncateDisplayId( + readNonEmptyString(context.resumeSessionDisplayId) ?? + (sessionCodec.getDisplayId ? sessionCodec.getDisplayId(explicitResumeSessionParams) : null) ?? + readNonEmptyString(explicitResumeSessionParams?.sessionId), + ); + const previousSessionParams = + explicitResumeSessionParams ?? + (explicitResumeSessionDisplayId ? { sessionId: explicitResumeSessionDisplayId } : null) ?? + normalizeSessionParams(sessionCodec.deserialize(taskSessionForRun?.sessionParamsJson ?? null)); const config = parseObject(agent.adapterConfig); const executionWorkspaceMode = resolveExecutionWorkspaceMode({ projectPolicy: projectExecutionWorkspacePolicy, @@ -2190,7 +2295,8 @@ export function heartbeatService(db: Db) { } const runtimeSessionFallback = taskKey || resetTaskSession ? null : runtime.sessionId; let previousSessionDisplayId = truncateDisplayId( - taskSessionForRun?.sessionDisplayId ?? + explicitResumeSessionDisplayId ?? + taskSessionForRun?.sessionDisplayId ?? (sessionCodec.getDisplayId ? sessionCodec.getDisplayId(runtimeSessionParams) : null) ?? readNonEmptyString(runtimeSessionParams?.sessionId) ?? runtimeSessionFallback, @@ -2801,7 +2907,9 @@ export function heartbeatService(db: Db) { payload: promotedPayload, }); - const sessionBefore = await resolveSessionBeforeForWakeup(deferredAgent, promotedTaskKey); + const sessionBefore = + readNonEmptyString(promotedContextSnapshot.resumeSessionDisplayId) ?? + await resolveSessionBeforeForWakeup(deferredAgent, promotedTaskKey); const now = new Date(); const newRun = await tx .insert(heartbeatRuns) @@ -2880,10 +2988,30 @@ export function heartbeatService(db: Db) { triggerDetail, payload, }); - const issueId = readNonEmptyString(enrichedContextSnapshot.issueId) ?? issueIdFromPayload; + let issueId = readNonEmptyString(enrichedContextSnapshot.issueId) ?? issueIdFromPayload; const agent = await getAgent(agentId); if (!agent) throw notFound("Agent not found"); + const explicitResumeSession = await resolveExplicitResumeSessionOverride(agent, payload, taskKey); + if (explicitResumeSession) { + enrichedContextSnapshot.resumeFromRunId = explicitResumeSession.resumeFromRunId; + enrichedContextSnapshot.resumeSessionDisplayId = explicitResumeSession.sessionDisplayId; + enrichedContextSnapshot.resumeSessionParams = explicitResumeSession.sessionParams; + if (!readNonEmptyString(enrichedContextSnapshot.issueId) && explicitResumeSession.issueId) { + enrichedContextSnapshot.issueId = explicitResumeSession.issueId; + } + if (!readNonEmptyString(enrichedContextSnapshot.taskId) && explicitResumeSession.taskId) { + enrichedContextSnapshot.taskId = explicitResumeSession.taskId; + } + if (!readNonEmptyString(enrichedContextSnapshot.taskKey) && explicitResumeSession.taskKey) { + enrichedContextSnapshot.taskKey = explicitResumeSession.taskKey; + } + issueId = readNonEmptyString(enrichedContextSnapshot.issueId) ?? issueId; + } + const effectiveTaskKey = readNonEmptyString(enrichedContextSnapshot.taskKey) ?? taskKey; + const sessionBefore = + explicitResumeSession?.sessionDisplayId ?? + await resolveSessionBeforeForWakeup(agent, effectiveTaskKey); const writeSkippedRequest = async (skipReason: string) => { await db.insert(agentWakeupRequests).values({ @@ -2947,7 +3075,6 @@ export function heartbeatService(db: Db) { if (issueId && !bypassIssueExecutionLock) { const agentNameKey = normalizeAgentNameKey(agent.name); - const sessionBefore = await resolveSessionBeforeForWakeup(agent, taskKey); const outcome = await db.transaction(async (tx) => { await tx.execute( @@ -3298,8 +3425,6 @@ export function heartbeatService(db: Db) { .returning() .then((rows) => rows[0]); - const sessionBefore = await resolveSessionBeforeForWakeup(agent, taskKey); - const newRun = await db .insert(heartbeatRuns) .values({