Merge pull request #2065 from edimuj/fix/heartbeat-session-reuse
fix: preserve session continuity for timer/heartbeat wakes
This commit is contained in:
commit
9f1bb350fe
2 changed files with 61 additions and 2 deletions
|
|
@ -6,6 +6,7 @@ import {
|
||||||
applyPersistedExecutionWorkspaceConfig,
|
applyPersistedExecutionWorkspaceConfig,
|
||||||
buildRealizedExecutionWorkspaceFromPersisted,
|
buildRealizedExecutionWorkspaceFromPersisted,
|
||||||
buildExplicitResumeSessionOverride,
|
buildExplicitResumeSessionOverride,
|
||||||
|
deriveTaskKeyWithHeartbeatFallback,
|
||||||
formatRuntimeWorkspaceWarningLog,
|
formatRuntimeWorkspaceWarningLog,
|
||||||
prioritizeProjectWorkspaceCandidatesForRun,
|
prioritizeProjectWorkspaceCandidatesForRun,
|
||||||
parseSessionCompactionPolicy,
|
parseSessionCompactionPolicy,
|
||||||
|
|
@ -328,6 +329,34 @@ describe("shouldResetTaskSessionForWake", () => {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe("deriveTaskKeyWithHeartbeatFallback", () => {
|
||||||
|
it("returns explicit taskKey when present", () => {
|
||||||
|
expect(deriveTaskKeyWithHeartbeatFallback({ taskKey: "issue-123" }, null)).toBe("issue-123");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns explicit issueId when no taskKey", () => {
|
||||||
|
expect(deriveTaskKeyWithHeartbeatFallback({ issueId: "issue-456" }, null)).toBe("issue-456");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns __heartbeat__ for timer wakes with no explicit key", () => {
|
||||||
|
expect(deriveTaskKeyWithHeartbeatFallback({ wakeSource: "timer" }, null)).toBe("__heartbeat__");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("prefers explicit key over heartbeat fallback even on timer wakes", () => {
|
||||||
|
expect(
|
||||||
|
deriveTaskKeyWithHeartbeatFallback({ wakeSource: "timer", taskKey: "issue-789" }, null),
|
||||||
|
).toBe("issue-789");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns null for non-timer wakes with no explicit key", () => {
|
||||||
|
expect(deriveTaskKeyWithHeartbeatFallback({ wakeSource: "on_demand" }, null)).toBeNull();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns null for empty context", () => {
|
||||||
|
expect(deriveTaskKeyWithHeartbeatFallback({}, null)).toBeNull();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
describe("buildExplicitResumeSessionOverride", () => {
|
describe("buildExplicitResumeSessionOverride", () => {
|
||||||
it("reuses saved task session params when they belong to the selected failed run", () => {
|
it("reuses saved task session params when they belong to the selected failed run", () => {
|
||||||
const result = buildExplicitResumeSessionOverride({
|
const result = buildExplicitResumeSessionOverride({
|
||||||
|
|
|
||||||
|
|
@ -607,6 +607,14 @@ function parseIssueAssigneeAdapterOverrides(
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Synthetic task key for timer/heartbeat wakes that have no issue context.
|
||||||
|
* This allows timer wakes to participate in the `agentTaskSessions` system
|
||||||
|
* and benefit from robust session resume, instead of relying solely on the
|
||||||
|
* simpler `agentRuntimeState.sessionId` fallback.
|
||||||
|
*/
|
||||||
|
const HEARTBEAT_TASK_KEY = "__heartbeat__";
|
||||||
|
|
||||||
function deriveTaskKey(
|
function deriveTaskKey(
|
||||||
contextSnapshot: Record<string, unknown> | null | undefined,
|
contextSnapshot: Record<string, unknown> | null | undefined,
|
||||||
payload: Record<string, unknown> | null | undefined,
|
payload: Record<string, unknown> | null | undefined,
|
||||||
|
|
@ -622,6 +630,28 @@ function deriveTaskKey(
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extended task key derivation that falls back to a stable synthetic key
|
||||||
|
* for timer/heartbeat wakes. This ensures timer wakes can resume their
|
||||||
|
* previous session via `agentTaskSessions` instead of starting fresh.
|
||||||
|
*
|
||||||
|
* The synthetic key is only used when:
|
||||||
|
* - No explicit task/issue key exists in the context
|
||||||
|
* - The wake source is "timer" (scheduled heartbeat)
|
||||||
|
*/
|
||||||
|
export function deriveTaskKeyWithHeartbeatFallback(
|
||||||
|
contextSnapshot: Record<string, unknown> | null | undefined,
|
||||||
|
payload: Record<string, unknown> | null | undefined,
|
||||||
|
) {
|
||||||
|
const explicit = deriveTaskKey(contextSnapshot, payload);
|
||||||
|
if (explicit) return explicit;
|
||||||
|
|
||||||
|
const wakeSource = readNonEmptyString(contextSnapshot?.wakeSource);
|
||||||
|
if (wakeSource === "timer") return HEARTBEAT_TASK_KEY;
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
export function shouldResetTaskSessionForWake(
|
export function shouldResetTaskSessionForWake(
|
||||||
contextSnapshot: Record<string, unknown> | null | undefined,
|
contextSnapshot: Record<string, unknown> | null | undefined,
|
||||||
) {
|
) {
|
||||||
|
|
@ -1595,7 +1625,7 @@ export function heartbeatService(db: Db) {
|
||||||
) {
|
) {
|
||||||
const contextSnapshot = parseObject(run.contextSnapshot);
|
const contextSnapshot = parseObject(run.contextSnapshot);
|
||||||
const issueId = readNonEmptyString(contextSnapshot.issueId);
|
const issueId = readNonEmptyString(contextSnapshot.issueId);
|
||||||
const taskKey = deriveTaskKey(contextSnapshot, null);
|
const taskKey = deriveTaskKeyWithHeartbeatFallback(contextSnapshot, null);
|
||||||
const sessionBefore = await resolveSessionBeforeForWakeup(agent, taskKey);
|
const sessionBefore = await resolveSessionBeforeForWakeup(agent, taskKey);
|
||||||
const retryContextSnapshot = {
|
const retryContextSnapshot = {
|
||||||
...contextSnapshot,
|
...contextSnapshot,
|
||||||
|
|
@ -2050,7 +2080,7 @@ export function heartbeatService(db: Db) {
|
||||||
|
|
||||||
const runtime = await ensureRuntimeState(agent);
|
const runtime = await ensureRuntimeState(agent);
|
||||||
const context = parseObject(run.contextSnapshot);
|
const context = parseObject(run.contextSnapshot);
|
||||||
const taskKey = deriveTaskKey(context, null);
|
const taskKey = deriveTaskKeyWithHeartbeatFallback(context, null);
|
||||||
const sessionCodec = getAdapterSessionCodec(agent.adapterType);
|
const sessionCodec = getAdapterSessionCodec(agent.adapterType);
|
||||||
const issueId = readNonEmptyString(context.issueId);
|
const issueId = readNonEmptyString(context.issueId);
|
||||||
const issueContext = issueId
|
const issueContext = issueId
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue