From 9944622d4f6d87848356ce9154e7230ec45a3772 Mon Sep 17 00:00:00 2001 From: joachimBrindeau Date: Sat, 30 May 2026 19:42:10 +0000 Subject: [PATCH] Recover agents stuck in `error` after upstream quota windows MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When an agent's heartbeat call fails (shared-subscription quota exhaustion is the recurring case), `finalizeAgentStatus` flips it to `status: "error"` and freezes `lastHeartbeatAt`. With the default `runtimeConfig.heartbeat` policy (empty / disabled), the timer-driven `tickTimers` path never re-evaluates the agent, and the event-driven recovery sweeps only fire on issue activity. Agents responsible for quiet specialties (infra, security, legacy modernization) therefore stay error-stuck after the upstream quota window has clearly reopened for healthier peers in the same fleet — observed: 8 agents frozen 2-3 days while sibling agents heartbeated normally every ~90 minutes. Adds a `recoverErroredAgents` sweep that walks `status: "error"` rows whose `lastHeartbeatAt` (falling back to `updatedAt` for never-beat agents) is older than `STICKY_ERROR_RECOVERY_MIN_AGE_MS` (2h) and flips them to `idle` — mirroring `POST /api/agents/:id/resume` exactly, so the next event-driven wake (assignment, mention, recovery sweep) picks them up naturally. `lastHeartbeatAt` is intentionally NOT bumped, preserving the original failure timestamp for audit. The UPDATE recreates the status guard in its WHERE clause so a concurrent /resume or fresh heartbeat can't be clobbered. Wires the sweep into the existing periodic interval in `index.ts` alongside `reapOrphanedRuns` etc. It runs independently so a slow reaper can't starve quota-recovered agents. The 2h floor sits below Anthropic's daily shared-subscription quota reset cadence so every natural quota window auto-recovers, while keeping real (non-quota) failures visible for two hours before silent recovery. Coverage: new `heartbeat-sticky-error-recovery.test.ts` covers the recovery floor (above/below), non-error status protection, the coalesce(updatedAt) fallback, the batch limit, activity-log emission, and idempotency across back-to-back sweeps. --- .../heartbeat-sticky-error-recovery.test.ts | 210 ++++++++++++++++++ server/src/index.ts | 18 ++ server/src/services/heartbeat.ts | 87 ++++++++ 3 files changed, 315 insertions(+) create mode 100644 server/src/__tests__/heartbeat-sticky-error-recovery.test.ts diff --git a/server/src/__tests__/heartbeat-sticky-error-recovery.test.ts b/server/src/__tests__/heartbeat-sticky-error-recovery.test.ts new file mode 100644 index 00000000000..8775a205477 --- /dev/null +++ b/server/src/__tests__/heartbeat-sticky-error-recovery.test.ts @@ -0,0 +1,210 @@ +import { randomUUID } from "node:crypto"; +import { eq } from "drizzle-orm"; +import { afterAll, afterEach, beforeAll, describe, expect, it } from "vitest"; +import { + activityLog, + agents, + agentRuntimeState, + agentWakeupRequests, + companies, + createDb, + heartbeatRunEvents, + heartbeatRuns, +} from "@paperclipai/db"; +import { + getEmbeddedPostgresTestSupport, + startEmbeddedPostgresTestDatabase, +} from "./helpers/embedded-postgres.js"; +import { + STICKY_ERROR_RECOVERY_MIN_AGE_MS, + heartbeatService, +} from "../services/heartbeat.ts"; + +const embeddedPostgresSupport = await getEmbeddedPostgresTestSupport(); +const describeEmbeddedPostgres = embeddedPostgresSupport.supported ? describe : describe.skip; + +if (!embeddedPostgresSupport.supported) { + console.warn( + `Skipping embedded Postgres sticky-error recovery tests on this host: ${embeddedPostgresSupport.reason ?? "unsupported environment"}`, + ); +} + +describeEmbeddedPostgres("heartbeat sticky-error recovery sweep", () => { + let db!: ReturnType; + let heartbeat!: ReturnType; + let tempDb: Awaited> | null = null; + + beforeAll(async () => { + tempDb = await startEmbeddedPostgresTestDatabase("paperclip-heartbeat-sticky-error-recovery-"); + db = createDb(tempDb.connectionString); + heartbeat = heartbeatService(db); + }, 20_000); + + afterEach(async () => { + await db.delete(heartbeatRunEvents); + await db.delete(heartbeatRuns); + await db.delete(agentWakeupRequests); + await db.delete(agentRuntimeState); + await db.delete(activityLog); + await db.delete(agents); + await db.delete(companies); + }); + + afterAll(async () => { + await tempDb?.cleanup(); + }); + + async function seedCompany() { + const id = randomUUID(); + await db.insert(companies).values({ + id, + name: "Paperclip", + issuePrefix: `T${id.replace(/-/g, "").slice(0, 6).toUpperCase()}`, + requireBoardApprovalForNewAgents: false, + }); + return id; + } + + async function seedAgent(input: { + companyId: string; + status: "idle" | "error" | "paused" | "terminated" | "running" | "pending_approval"; + lastHeartbeatAt: Date | null; + pauseReason?: string | null; + }) { + const id = randomUUID(); + await db.insert(agents).values({ + id, + companyId: input.companyId, + name: `Agent-${id.slice(0, 8)}`, + role: "engineer", + status: input.status, + adapterType: "claude_local", + adapterConfig: {}, + runtimeConfig: {}, + permissions: {}, + lastHeartbeatAt: input.lastHeartbeatAt, + pauseReason: input.pauseReason ?? null, + pausedAt: input.pauseReason ? new Date() : null, + }); + return id; + } + + it("flips error agents older than the recovery floor back to idle", async () => { + const companyId = await seedCompany(); + const now = new Date("2026-05-30T20:00:00.000Z"); + const stale = new Date(now.getTime() - (STICKY_ERROR_RECOVERY_MIN_AGE_MS + 60_000)); + const stuckId = await seedAgent({ companyId, status: "error", lastHeartbeatAt: stale }); + + const result = await heartbeat.recoverErroredAgents(now); + + expect(result).toEqual({ candidates: 1, recovered: 1 }); + const [after] = await db.select().from(agents).where(eq(agents.id, stuckId)); + expect(after?.status).toBe("idle"); + expect(after?.pauseReason).toBeNull(); + expect(after?.pausedAt).toBeNull(); + // lastHeartbeatAt is intentionally NOT bumped — preserves the original + // failure timestamp so operators can audit how long the agent was stuck. + expect(after?.lastHeartbeatAt?.toISOString()).toBe(stale.toISOString()); + }); + + it("leaves error agents inside the recovery floor untouched", async () => { + const companyId = await seedCompany(); + const now = new Date("2026-05-30T20:00:00.000Z"); + const recent = new Date(now.getTime() - (STICKY_ERROR_RECOVERY_MIN_AGE_MS - 60_000)); + const id = await seedAgent({ companyId, status: "error", lastHeartbeatAt: recent }); + + const result = await heartbeat.recoverErroredAgents(now); + + expect(result).toEqual({ candidates: 0, recovered: 0 }); + const [after] = await db.select().from(agents).where(eq(agents.id, id)); + expect(after?.status).toBe("error"); + }); + + it("does not disturb non-error agents even when they are stale", async () => { + const companyId = await seedCompany(); + const now = new Date("2026-05-30T20:00:00.000Z"); + const stale = new Date(now.getTime() - (STICKY_ERROR_RECOVERY_MIN_AGE_MS + 3_600_000)); + const pausedId = await seedAgent({ + companyId, + status: "paused", + lastHeartbeatAt: stale, + pauseReason: "manual pause", + }); + const terminatedId = await seedAgent({ companyId, status: "terminated", lastHeartbeatAt: stale }); + const idleId = await seedAgent({ companyId, status: "idle", lastHeartbeatAt: stale }); + + const result = await heartbeat.recoverErroredAgents(now); + + expect(result).toEqual({ candidates: 0, recovered: 0 }); + const rows = await db.select().from(agents); + const byId = new Map(rows.map((row) => [row.id, row.status])); + expect(byId.get(pausedId)).toBe("paused"); + expect(byId.get(terminatedId)).toBe("terminated"); + expect(byId.get(idleId)).toBe("idle"); + }); + + it("treats missing lastHeartbeatAt by falling back to updatedAt", async () => { + const companyId = await seedCompany(); + const now = new Date("2026-05-30T20:00:00.000Z"); + const id = await seedAgent({ companyId, status: "error", lastHeartbeatAt: null }); + // Force updatedAt past the recovery floor so the coalesce branch triggers. + const stale = new Date(now.getTime() - (STICKY_ERROR_RECOVERY_MIN_AGE_MS + 60_000)); + await db.update(agents).set({ updatedAt: stale }).where(eq(agents.id, id)); + + const result = await heartbeat.recoverErroredAgents(now); + + expect(result).toEqual({ candidates: 1, recovered: 1 }); + const [after] = await db.select().from(agents).where(eq(agents.id, id)); + expect(after?.status).toBe("idle"); + }); + + it("respects the configured batch limit", async () => { + const companyId = await seedCompany(); + const now = new Date("2026-05-30T20:00:00.000Z"); + const stale = new Date(now.getTime() - (STICKY_ERROR_RECOVERY_MIN_AGE_MS + 60_000)); + for (let i = 0; i < 5; i += 1) { + await seedAgent({ companyId, status: "error", lastHeartbeatAt: stale }); + } + + const result = await heartbeat.recoverErroredAgents(now, { limit: 2 }); + + expect(result).toEqual({ candidates: 2, recovered: 2 }); + const remaining = await db.select().from(agents).where(eq(agents.status, "error")); + expect(remaining).toHaveLength(3); + }); + + it("writes an activity-log entry per recovered agent", async () => { + const companyId = await seedCompany(); + const now = new Date("2026-05-30T20:00:00.000Z"); + const stale = new Date(now.getTime() - (STICKY_ERROR_RECOVERY_MIN_AGE_MS + 60_000)); + const id = await seedAgent({ companyId, status: "error", lastHeartbeatAt: stale }); + + await heartbeat.recoverErroredAgents(now); + + const entries = await db + .select() + .from(activityLog) + .where(eq(activityLog.agentId, id)); + expect(entries).toHaveLength(1); + expect(entries[0]).toMatchObject({ + action: "agent.recovered_from_error", + actorType: "system", + actorId: "sticky_error_recovery", + entityType: "agent", + entityId: id, + }); + }); + + it("is idempotent across back-to-back sweeps", async () => { + const companyId = await seedCompany(); + const now = new Date("2026-05-30T20:00:00.000Z"); + const stale = new Date(now.getTime() - (STICKY_ERROR_RECOVERY_MIN_AGE_MS + 60_000)); + await seedAgent({ companyId, status: "error", lastHeartbeatAt: stale }); + + const first = await heartbeat.recoverErroredAgents(now); + const second = await heartbeat.recoverErroredAgents(now); + + expect(first).toEqual({ candidates: 1, recovered: 1 }); + expect(second).toEqual({ candidates: 0, recovered: 0 }); + }); +}); diff --git a/server/src/index.ts b/server/src/index.ts index 38caf44a518..b793445e422 100644 --- a/server/src/index.ts +++ b/server/src/index.ts @@ -829,6 +829,24 @@ export async function startServer(): Promise { .catch((err) => { logger.error({ err }, "periodic heartbeat recovery failed"); }); + + // Un-stick agents that flipped to `status: "error"` (typically on an + // upstream quota window) and have not been touched for at least the + // recovery floor. Runs independently of the run/issue-driven chain + // above so a slow reaper can't starve quota-recovered agents. + void heartbeat + .recoverErroredAgents(new Date()) + .then((result) => { + if (result.recovered > 0) { + logger.warn( + { ...result }, + "periodic sticky-error recovery flipped agents back to idle", + ); + } + }) + .catch((err) => { + logger.error({ err }, "periodic sticky-error recovery failed"); + }); }, config.heartbeatSchedulerIntervalMs); } diff --git a/server/src/services/heartbeat.ts b/server/src/services/heartbeat.ts index 09a1e5fe7cb..3f70baebdf4 100644 --- a/server/src/services/heartbeat.ts +++ b/server/src/services/heartbeat.ts @@ -235,6 +235,14 @@ const MAX_TURN_CONTINUATION_MAX_ATTEMPTS_CAP = 10; const MAX_TURN_CONTINUATION_DEFAULT_DELAY_MS = 1_000; const MAX_TURN_CONTINUATION_MAX_DELAY_MS = 5 * 60 * 1000; const MAX_TURN_CONTINUATION_LIVE_RUN_STATUSES = ["scheduled_retry", "queued", "running"] as const; + +// How long an agent must sit in `status: "error"` before the periodic recovery +// sweep flips it back to `idle`. Two hours comfortably outruns any single +// upstream quota window (Anthropic shared-subscription resets daily) while +// still being long enough that real (non-quota) failures stay visible. +export const STICKY_ERROR_RECOVERY_MIN_AGE_MS = 2 * 60 * 60 * 1000; +const STICKY_ERROR_RECOVERY_DEFAULT_BATCH_LIMIT = 50; +const STICKY_ERROR_RECOVERY_ACTOR_ID = "sticky_error_recovery"; type CodexTransientFallbackMode = | "same_session" | "safer_invocation" @@ -10194,6 +10202,85 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {}) buildRunOutputSilence, + recoverErroredAgents: async ( + now: Date = new Date(), + opts: { minAgeMs?: number; limit?: number } = {}, + ) => { + const minAgeMs = Math.max(0, opts.minAgeMs ?? STICKY_ERROR_RECOVERY_MIN_AGE_MS); + const limit = Math.max(1, Math.min(opts.limit ?? STICKY_ERROR_RECOVERY_DEFAULT_BATCH_LIMIT, 500)); + const cutoff = new Date(now.getTime() - minAgeMs); + + const candidates = await db + .select({ + id: agents.id, + companyId: agents.companyId, + lastHeartbeatAt: agents.lastHeartbeatAt, + updatedAt: agents.updatedAt, + }) + .from(agents) + .where( + and( + eq(agents.status, "error"), + // Use lastHeartbeatAt as the staleness baseline; fall back to + // updatedAt for agents that errored before ever heartbeating. + sql`coalesce(${agents.lastHeartbeatAt}, ${agents.updatedAt}) <= ${cutoff.toISOString()}::timestamptz`, + ), + ) + .limit(limit); + + let recovered = 0; + for (const candidate of candidates) { + const [updated] = await db + .update(agents) + .set({ + status: "idle", + pauseReason: null, + pausedAt: null, + updatedAt: now, + }) + // Recheck status in the WHERE clause so a concurrent /resume or + // reverse-quota-window heartbeat can't be clobbered by this sweep. + .where(and(eq(agents.id, candidate.id), eq(agents.status, "error"))) + .returning(); + if (!updated) continue; + recovered += 1; + + await logActivity(db, { + companyId: updated.companyId, + actorType: "system", + actorId: STICKY_ERROR_RECOVERY_ACTOR_ID, + agentId: updated.id, + runId: null, + action: "agent.recovered_from_error", + entityType: "agent", + entityId: updated.id, + details: { + source: "heartbeat.recoverErroredAgents", + minAgeMs, + cutoffIso: cutoff.toISOString(), + previousLastHeartbeatAt: candidate.lastHeartbeatAt + ? new Date(candidate.lastHeartbeatAt).toISOString() + : null, + }, + }); + + publishLiveEvent({ + companyId: updated.companyId, + type: "agent.status", + payload: { + agentId: updated.id, + status: updated.status, + lastHeartbeatAt: updated.lastHeartbeatAt + ? new Date(updated.lastHeartbeatAt).toISOString() + : null, + outcome: "sticky_error_recovered", + }, + }); + } + + return { candidates: candidates.length, recovered }; + }, + tickTimers: async (now = new Date()) => { const allAgents = await db.select().from(agents); let checked = 0;