Honor explicit failed-run session resume

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
dotta 2026-03-21 17:09:38 -05:00
parent 02c779b41d
commit eac3f3fa69
2 changed files with 186 additions and 8 deletions

View file

@ -1,7 +1,9 @@
import { describe, expect, it } from "vitest";
import type { agents } from "@paperclipai/db";
import { sessionCodec as codexSessionCodec } from "@paperclipai/adapter-codex-local/server";
import { resolveDefaultAgentWorkspaceDir } from "../home-paths.js";
import {
buildExplicitResumeSessionOverride,
formatRuntimeWorkspaceWarningLog,
prioritizeProjectWorkspaceCandidatesForRun,
parseSessionCompactionPolicy,
@ -182,6 +184,57 @@ describe("shouldResetTaskSessionForWake", () => {
});
});
describe("buildExplicitResumeSessionOverride", () => {
it("reuses saved task session params when they belong to the selected failed run", () => {
const result = buildExplicitResumeSessionOverride({
resumeFromRunId: "run-1",
resumeRunSessionIdBefore: "session-before",
resumeRunSessionIdAfter: "session-after",
taskSession: {
sessionParamsJson: {
sessionId: "session-after",
cwd: "/tmp/project",
},
sessionDisplayId: "session-after",
lastRunId: "run-1",
},
sessionCodec: codexSessionCodec,
});
expect(result).toEqual({
sessionDisplayId: "session-after",
sessionParams: {
sessionId: "session-after",
cwd: "/tmp/project",
},
});
});
it("falls back to the selected run session id when no matching task session params are available", () => {
const result = buildExplicitResumeSessionOverride({
resumeFromRunId: "run-1",
resumeRunSessionIdBefore: "session-before",
resumeRunSessionIdAfter: "session-after",
taskSession: {
sessionParamsJson: {
sessionId: "other-session",
cwd: "/tmp/project",
},
sessionDisplayId: "other-session",
lastRunId: "run-2",
},
sessionCodec: codexSessionCodec,
});
expect(result).toEqual({
sessionDisplayId: "session-after",
sessionParams: {
sessionId: "session-after",
},
});
});
});
describe("formatRuntimeWorkspaceWarningLog", () => {
it("emits informational workspace warnings on stdout", () => {
expect(formatRuntimeWorkspaceWarningLog("Using fallback workspace")).toEqual({

View file

@ -326,6 +326,51 @@ async function resolveLedgerScopeForRun(
};
}
type ResumeSessionRow = {
sessionParamsJson: Record<string, unknown> | null;
sessionDisplayId: string | null;
lastRunId: string | null;
};
export function buildExplicitResumeSessionOverride(input: {
resumeFromRunId: string;
resumeRunSessionIdBefore: string | null;
resumeRunSessionIdAfter: string | null;
taskSession: ResumeSessionRow | null;
sessionCodec: AdapterSessionCodec;
}) {
const desiredDisplayId = truncateDisplayId(
input.resumeRunSessionIdAfter ?? input.resumeRunSessionIdBefore,
);
const taskSessionParams = normalizeSessionParams(
input.sessionCodec.deserialize(input.taskSession?.sessionParamsJson ?? null),
);
const taskSessionDisplayId = truncateDisplayId(
input.taskSession?.sessionDisplayId ??
(input.sessionCodec.getDisplayId ? input.sessionCodec.getDisplayId(taskSessionParams) : null) ??
readNonEmptyString(taskSessionParams?.sessionId),
);
const canReuseTaskSessionParams =
input.taskSession != null &&
(
input.taskSession.lastRunId === input.resumeFromRunId ||
(!!desiredDisplayId && taskSessionDisplayId === desiredDisplayId)
);
const sessionParams =
canReuseTaskSessionParams
? taskSessionParams
: desiredDisplayId
? { sessionId: desiredDisplayId }
: null;
const sessionDisplayId = desiredDisplayId ?? (canReuseTaskSessionParams ? taskSessionDisplayId : null);
if (!sessionDisplayId && !sessionParams) return null;
return {
sessionDisplayId,
sessionParams,
};
}
function normalizeUsageTotals(usage: UsageSummary | null | undefined): UsageTotals | null {
if (!usage) return null;
return {
@ -978,6 +1023,57 @@ export function heartbeatService(db: Db) {
return runtimeForRun?.sessionId ?? null;
}
async function resolveExplicitResumeSessionOverride(
agent: typeof agents.$inferSelect,
payload: Record<string, unknown> | null,
taskKey: string | null,
) {
const resumeFromRunId = readNonEmptyString(payload?.resumeFromRunId);
if (!resumeFromRunId) return null;
const resumeRun = await db
.select({
id: heartbeatRuns.id,
contextSnapshot: heartbeatRuns.contextSnapshot,
sessionIdBefore: heartbeatRuns.sessionIdBefore,
sessionIdAfter: heartbeatRuns.sessionIdAfter,
})
.from(heartbeatRuns)
.where(
and(
eq(heartbeatRuns.id, resumeFromRunId),
eq(heartbeatRuns.companyId, agent.companyId),
eq(heartbeatRuns.agentId, agent.id),
),
)
.then((rows) => rows[0] ?? null);
if (!resumeRun) return null;
const resumeContext = parseObject(resumeRun.contextSnapshot);
const resumeTaskKey = deriveTaskKey(resumeContext, null) ?? taskKey;
const resumeTaskSession = resumeTaskKey
? await getTaskSession(agent.companyId, agent.id, agent.adapterType, resumeTaskKey)
: null;
const sessionCodec = getAdapterSessionCodec(agent.adapterType);
const sessionOverride = buildExplicitResumeSessionOverride({
resumeFromRunId,
resumeRunSessionIdBefore: resumeRun.sessionIdBefore,
resumeRunSessionIdAfter: resumeRun.sessionIdAfter,
taskSession: resumeTaskSession,
sessionCodec,
});
if (!sessionOverride) return null;
return {
resumeFromRunId,
taskKey: resumeTaskKey,
issueId: readNonEmptyString(resumeContext.issueId),
taskId: readNonEmptyString(resumeContext.taskId) ?? readNonEmptyString(resumeContext.issueId),
sessionDisplayId: sessionOverride.sessionDisplayId,
sessionParams: sessionOverride.sessionParams,
};
}
async function resolveWorkspaceForRun(
agent: typeof agents.$inferSelect,
context: Record<string, unknown>,
@ -1921,9 +2017,18 @@ export function heartbeatService(db: Db) {
const resetTaskSession = shouldResetTaskSessionForWake(context);
const sessionResetReason = describeSessionResetReason(context);
const taskSessionForRun = resetTaskSession ? null : taskSession;
const previousSessionParams = normalizeSessionParams(
sessionCodec.deserialize(taskSessionForRun?.sessionParamsJson ?? null),
const explicitResumeSessionParams = normalizeSessionParams(
sessionCodec.deserialize(parseObject(context.resumeSessionParams)),
);
const explicitResumeSessionDisplayId = truncateDisplayId(
readNonEmptyString(context.resumeSessionDisplayId) ??
(sessionCodec.getDisplayId ? sessionCodec.getDisplayId(explicitResumeSessionParams) : null) ??
readNonEmptyString(explicitResumeSessionParams?.sessionId),
);
const previousSessionParams =
explicitResumeSessionParams ??
(explicitResumeSessionDisplayId ? { sessionId: explicitResumeSessionDisplayId } : null) ??
normalizeSessionParams(sessionCodec.deserialize(taskSessionForRun?.sessionParamsJson ?? null));
const config = parseObject(agent.adapterConfig);
const executionWorkspaceMode = resolveExecutionWorkspaceMode({
projectPolicy: projectExecutionWorkspacePolicy,
@ -2190,7 +2295,8 @@ export function heartbeatService(db: Db) {
}
const runtimeSessionFallback = taskKey || resetTaskSession ? null : runtime.sessionId;
let previousSessionDisplayId = truncateDisplayId(
taskSessionForRun?.sessionDisplayId ??
explicitResumeSessionDisplayId ??
taskSessionForRun?.sessionDisplayId ??
(sessionCodec.getDisplayId ? sessionCodec.getDisplayId(runtimeSessionParams) : null) ??
readNonEmptyString(runtimeSessionParams?.sessionId) ??
runtimeSessionFallback,
@ -2801,7 +2907,9 @@ export function heartbeatService(db: Db) {
payload: promotedPayload,
});
const sessionBefore = await resolveSessionBeforeForWakeup(deferredAgent, promotedTaskKey);
const sessionBefore =
readNonEmptyString(promotedContextSnapshot.resumeSessionDisplayId) ??
await resolveSessionBeforeForWakeup(deferredAgent, promotedTaskKey);
const now = new Date();
const newRun = await tx
.insert(heartbeatRuns)
@ -2880,10 +2988,30 @@ export function heartbeatService(db: Db) {
triggerDetail,
payload,
});
const issueId = readNonEmptyString(enrichedContextSnapshot.issueId) ?? issueIdFromPayload;
let issueId = readNonEmptyString(enrichedContextSnapshot.issueId) ?? issueIdFromPayload;
const agent = await getAgent(agentId);
if (!agent) throw notFound("Agent not found");
const explicitResumeSession = await resolveExplicitResumeSessionOverride(agent, payload, taskKey);
if (explicitResumeSession) {
enrichedContextSnapshot.resumeFromRunId = explicitResumeSession.resumeFromRunId;
enrichedContextSnapshot.resumeSessionDisplayId = explicitResumeSession.sessionDisplayId;
enrichedContextSnapshot.resumeSessionParams = explicitResumeSession.sessionParams;
if (!readNonEmptyString(enrichedContextSnapshot.issueId) && explicitResumeSession.issueId) {
enrichedContextSnapshot.issueId = explicitResumeSession.issueId;
}
if (!readNonEmptyString(enrichedContextSnapshot.taskId) && explicitResumeSession.taskId) {
enrichedContextSnapshot.taskId = explicitResumeSession.taskId;
}
if (!readNonEmptyString(enrichedContextSnapshot.taskKey) && explicitResumeSession.taskKey) {
enrichedContextSnapshot.taskKey = explicitResumeSession.taskKey;
}
issueId = readNonEmptyString(enrichedContextSnapshot.issueId) ?? issueId;
}
const effectiveTaskKey = readNonEmptyString(enrichedContextSnapshot.taskKey) ?? taskKey;
const sessionBefore =
explicitResumeSession?.sessionDisplayId ??
await resolveSessionBeforeForWakeup(agent, effectiveTaskKey);
const writeSkippedRequest = async (skipReason: string) => {
await db.insert(agentWakeupRequests).values({
@ -2947,7 +3075,6 @@ export function heartbeatService(db: Db) {
if (issueId && !bypassIssueExecutionLock) {
const agentNameKey = normalizeAgentNameKey(agent.name);
const sessionBefore = await resolveSessionBeforeForWakeup(agent, taskKey);
const outcome = await db.transaction(async (tx) => {
await tx.execute(
@ -3298,8 +3425,6 @@ export function heartbeatService(db: Db) {
.returning()
.then((rows) => rows[0]);
const sessionBefore = await resolveSessionBeforeForWakeup(agent, taskKey);
const newRun = await db
.insert(heartbeatRuns)
.values({