diff --git a/apps/worker/src/jobs/proposals-archive.ts b/apps/worker/src/jobs/proposals-archive.ts new file mode 100644 index 0000000..35dc839 --- /dev/null +++ b/apps/worker/src/jobs/proposals-archive.ts @@ -0,0 +1,201 @@ +import type { MemoryStore } from '@colony/core'; + +export const PROPOSAL_ARCHIVE_INTERVAL_MS = 6 * 60 * 60_000; +export const STALE_PROPOSAL_AGE_MS = 7 * 24 * 60 * 60_000; +export const PROPOSAL_ARCHIVE_SESSION_ID = 'colony-worker:proposals-archive'; + +export interface ProposalArchiveJobResult { + archived_count: number; + archived_task_ids: number[]; +} + +export interface ProposalArchiveJobOptions { + now?: () => number; + staleAfterMs?: number; +} + +export interface ProposalArchiveLoopHandle { + stop: () => Promise; + lastResult: () => ProposalArchiveJobResult | null; + runNow: () => Promise; +} + +export interface ProposalArchiveLoopOptions extends ProposalArchiveJobOptions { + store: MemoryStore; + intervalMs?: number; + log?: (line: string) => void; +} + +interface SqlRunResult { + changes: number; + lastInsertRowid: number | bigint; +} + +interface SqlStatement { + all(...args: unknown[]): Array>; + get(...args: unknown[]): Record | undefined; + run(...args: unknown[]): SqlRunResult; +} + +interface SqlDatabase { + prepare(sql: string): SqlStatement; +} + +interface StorageWithDb { + db: SqlDatabase; +} + +interface ProposedTaskRow { + id: number; + title: string; + created_by: string; +} + +export function runProposalArchiveJob( + store: MemoryStore, + opts: ProposalArchiveJobOptions = {}, +): ProposalArchiveJobResult { + const now = opts.now?.() ?? Date.now(); + const cutoff = now - (opts.staleAfterMs ?? STALE_PROPOSAL_AGE_MS); + const db = rawDb(store); + return store.storage.transaction( + () => { + store.storage.createSession({ + id: PROPOSAL_ARCHIVE_SESSION_ID, + ide: 'worker', + cwd: null, + started_at: now, + metadata: null, + }); + + const stale = db + .prepare( + `SELECT id, title, created_by + FROM tasks + WHERE proposal_status = 'proposed' + AND created_at < ? + ORDER BY created_at ASC, id ASC`, + ) + .all(cutoff) + .map(normalizeTaskRow); + const archivedTaskIds: number[] = []; + + for (const task of stale) { + const result = db + .prepare( + `UPDATE tasks + SET proposal_status = 'archived', + updated_at = ? + WHERE id = ? + AND proposal_status = 'proposed'`, + ) + .run(now, task.id); + if (result.changes === 0) continue; + + db.prepare( + `UPDATE agent_profiles + SET open_proposal_count = CASE + WHEN open_proposal_count > 0 THEN open_proposal_count - 1 + ELSE 0 + END, + updated_at = ? + WHERE agent = ?`, + ).run(now, task.created_by); + store.storage.insertObservation({ + session_id: PROPOSAL_ARCHIVE_SESSION_ID, + kind: 'proposal-auto-archived', + content: `Auto-archived stale proposal task ${task.id}: ${task.title}`, + compressed: false, + intensity: null, + ts: now, + task_id: task.id, + metadata: { + proposer: task.created_by, + stale_after_ms: opts.staleAfterMs ?? STALE_PROPOSAL_AGE_MS, + }, + }); + archivedTaskIds.push(task.id); + } + + return { + archived_count: archivedTaskIds.length, + archived_task_ids: archivedTaskIds, + }; + }, + { immediate: true }, + ); +} + +export function startProposalArchiveJobLoop( + opts: ProposalArchiveLoopOptions, +): ProposalArchiveLoopHandle { + const { store } = opts; + const intervalMs = opts.intervalMs ?? PROPOSAL_ARCHIVE_INTERVAL_MS; + const log = opts.log ?? ((line: string) => process.stderr.write(`${line}\n`)); + let stopped = false; + let inFlight: Promise | null = null; + let latest: ProposalArchiveJobResult | null = null; + + const runOnce = (): ProposalArchiveJobResult => { + const result = runProposalArchiveJob(store, opts); + latest = result; + logRun(log, result); + return result; + }; + + const tick = async (): Promise => { + if (stopped) return; + try { + return runOnce(); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + log(JSON.stringify({ component: 'colony-worker', job: 'proposals-archive', error: message })); + } + }; + + let timer: NodeJS.Timeout | null = null; + if (intervalMs !== 0) { + const firstRunDelay = Math.min(5_000, intervalMs); + timer = setTimeout(function loop() { + if (stopped) return; + inFlight = tick(); + void inFlight.finally(() => { + if (stopped) return; + timer = setTimeout(loop, intervalMs); + }); + }, firstRunDelay); + } + + return { + async stop() { + stopped = true; + if (timer) clearTimeout(timer); + if (inFlight) await inFlight; + }, + lastResult: () => latest, + runNow: async () => runOnce(), + }; +} + +function rawDb(store: MemoryStore): SqlDatabase { + return (store.storage as unknown as StorageWithDb).db; +} + +function normalizeTaskRow(row: Record): ProposedTaskRow { + return { + id: typeof row.id === 'number' ? row.id : 0, + title: typeof row.title === 'string' ? row.title : '', + created_by: typeof row.created_by === 'string' ? row.created_by : '', + }; +} + +function logRun(log: (line: string) => void, result: ProposalArchiveJobResult): void { + log( + JSON.stringify({ + component: 'colony-worker', + job: 'proposals-archive', + archived_count: result.archived_count, + archived_task_ids: result.archived_task_ids, + }), + ); +} diff --git a/apps/worker/src/server.ts b/apps/worker/src/server.ts index c5bb31a..0494b54 100644 --- a/apps/worker/src/server.ts +++ b/apps/worker/src/server.ts @@ -24,6 +24,10 @@ import { startCoordinationSweepLoop, } from './coordination-sweep-loop.js'; import { type EmbedLoopHandle, startEmbedLoop, stateFilePath } from './embed-loop.js'; +import { + type ProposalArchiveLoopHandle, + startProposalArchiveJobLoop, +} from './jobs/proposals-archive.js'; import { type RescueLoopHandle, startRescueLoop } from './rescue-loop.js'; import { type StrandedSessionSummary, @@ -597,6 +601,7 @@ export async function start(): Promise { const handles: { rescueLoop?: RescueLoopHandle; coordinationSweepLoop?: CoordinationSweepLoopHandle; + proposalArchiveLoop?: ProposalArchiveLoopHandle; } = {}; const servers: Array> = []; @@ -605,6 +610,7 @@ export async function start(): Promise { caffeinate?.stop(); if (handles.rescueLoop) await handles.rescueLoop.stop(); if (handles.coordinationSweepLoop) await handles.coordinationSweepLoop.stop(); + if (handles.proposalArchiveLoop) await handles.proposalArchiveLoop.stop(); if (loop) await loop.stop(); for (const s of servers) s.close(); store.close(); @@ -692,6 +698,11 @@ export async function start(): Promise { log: (line) => process.stderr.write(`${line}\n`), }); + handles.proposalArchiveLoop = startProposalArchiveJobLoop({ + store, + log: (line) => process.stderr.write(`${line}\n`), + }); + const app = buildApp(store, loop, { rescueLoop: handles.rescueLoop, fileHeatHalfLifeMinutes: settings.fileHeatHalfLifeMinutes, diff --git a/apps/worker/test/jobs/proposals-archive.test.ts b/apps/worker/test/jobs/proposals-archive.test.ts new file mode 100644 index 0000000..34ce38f --- /dev/null +++ b/apps/worker/test/jobs/proposals-archive.test.ts @@ -0,0 +1,155 @@ +import { mkdtempSync, rmSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { defaultSettings } from '@colony/config'; +import { MemoryStore } from '@colony/core'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { + STALE_PROPOSAL_AGE_MS, + runProposalArchiveJob, + startProposalArchiveJobLoop, +} from '../../src/jobs/proposals-archive.js'; + +interface SqlRunResult { + changes: number; + lastInsertRowid: number | bigint; +} + +interface SqlStatement { + all(...args: unknown[]): Array>; + get(...args: unknown[]): Record | undefined; + run(...args: unknown[]): SqlRunResult; +} + +interface SqlDatabase { + prepare(sql: string): SqlStatement; +} + +interface StorageWithDb { + db: SqlDatabase; +} + +let dir: string; +let store: MemoryStore; +let db: SqlDatabase; + +beforeEach(() => { + dir = mkdtempSync(join(tmpdir(), 'colony-worker-proposals-archive-')); + store = new MemoryStore({ dbPath: join(dir, 'data.db'), settings: defaultSettings }); + db = (store.storage as unknown as StorageWithDb).db; +}); + +afterEach(() => { + store.close(); + rmSync(dir, { recursive: true, force: true }); +}); + +describe('proposal archive job', () => { + it('archives proposed tasks older than seven days and decrements the proposer count', () => { + const now = 1_800_000_000_000; + seedProfile('scout-A', 2); + const staleTaskId = seedProposalTask({ + title: 'old proposal', + branch: 'proposal/old', + createdBy: 'scout-A', + createdAt: now - STALE_PROPOSAL_AGE_MS - 60_000, + }); + const freshTaskId = seedProposalTask({ + title: 'fresh proposal', + branch: 'proposal/fresh', + createdBy: 'scout-A', + createdAt: now - 60_000, + }); + + const result = runProposalArchiveJob(store, { now: () => now }); + + expect(result).toEqual({ archived_count: 1, archived_task_ids: [staleTaskId] }); + expect(taskStatus(staleTaskId)).toBe('archived'); + expect(taskStatus(freshTaskId)).toBe('proposed'); + expect(profileCount('scout-A')).toBe(1); + const observations = store.storage.taskObservationsByKind( + staleTaskId, + 'proposal-auto-archived', + 10, + ); + expect(observations).toHaveLength(1); + expect(observations[0]?.content).toContain('Auto-archived stale proposal task'); + expect(JSON.parse(observations[0]?.metadata ?? '{}')).toMatchObject({ + proposer: 'scout-A', + stale_after_ms: STALE_PROPOSAL_AGE_MS, + }); + }); + + it('runNow logs structured JSON for the scheduled worker surface', async () => { + const now = 1_800_000_000_000; + seedProfile('scout-A', 1); + const staleTaskId = seedProposalTask({ + title: 'old proposal', + branch: 'proposal/logged', + createdBy: 'scout-A', + createdAt: now - STALE_PROPOSAL_AGE_MS - 60_000, + }); + const logs: string[] = []; + const handle = startProposalArchiveJobLoop({ + store, + intervalMs: 0, + now: () => now, + log: (line) => logs.push(line), + }); + + await handle.runNow(); + await handle.stop(); + + expect(JSON.parse(logs[0] ?? '{}')).toEqual({ + component: 'colony-worker', + job: 'proposals-archive', + archived_count: 1, + archived_task_ids: [staleTaskId], + }); + }); +}); + +function seedProfile(agent: string, openProposalCount: number): void { + store.storage.upsertAgentProfile({ + agent, + capabilities: '{}', + role: 'scout', + open_proposal_count: openProposalCount, + updated_at: 1, + }); +} + +function seedProposalTask(args: { + title: string; + branch: string; + createdBy: string; + createdAt: number; +}): number { + const result = db + .prepare( + `INSERT INTO tasks( + title, repo_root, branch, status, created_by, created_at, updated_at, + proposal_status, approved_by, observation_evidence_ids + ) VALUES (?, ?, ?, 'open', ?, ?, ?, 'proposed', NULL, ?)`, + ) + .run( + args.title, + '/repo', + args.branch, + args.createdBy, + args.createdAt, + args.createdAt, + JSON.stringify([101]), + ); + return Number(result.lastInsertRowid); +} + +function taskStatus(taskId: number): string | null { + const row = db.prepare('SELECT proposal_status FROM tasks WHERE id = ?').get(taskId); + return typeof row?.proposal_status === 'string' ? row.proposal_status : null; +} + +function profileCount(agent: string): number { + const row = store.storage.getAgentProfile(agent); + return row?.open_proposal_count ?? 0; +} diff --git a/openspec/changes/agent-codex-worker-stale-proposal-archive-job-2026-05-15-12-03/.openspec.yaml b/openspec/changes/agent-codex-worker-stale-proposal-archive-job-2026-05-15-12-03/.openspec.yaml new file mode 100644 index 0000000..9f70866 --- /dev/null +++ b/openspec/changes/agent-codex-worker-stale-proposal-archive-job-2026-05-15-12-03/.openspec.yaml @@ -0,0 +1,2 @@ +schema: spec-driven +created: 2026-05-15 diff --git a/openspec/changes/agent-codex-worker-stale-proposal-archive-job-2026-05-15-12-03/proposal.md b/openspec/changes/agent-codex-worker-stale-proposal-archive-job-2026-05-15-12-03/proposal.md new file mode 100644 index 0000000..4efc94e --- /dev/null +++ b/openspec/changes/agent-codex-worker-stale-proposal-archive-job-2026-05-15-12-03/proposal.md @@ -0,0 +1,16 @@ +## Why + +- Proposed task threads can remain in `proposal_status='proposed'` forever when no queen/operator approves them. The worker needs an automated cleanup path so stale proposals stop inflating scout open-proposal counts and cluttering ready-work surfaces. + +## What Changes + +- Add a worker job that archives proposed tasks older than seven days. +- Decrement the proposing agent's `open_proposal_count` when an old proposal is archived. +- Record a task-scoped `proposal-auto-archived` observation and structured worker log output for auditability. +- Register the job in the worker runtime on a six-hour cadence. + +## Impact + +- Affected surface: `@colony/worker` startup and proposal task-thread cleanup. +- Risk is limited to rows already marked `proposal_status='proposed'` and older than the retention window. +- Focused worker tests cover stale-vs-fresh behavior, count decrementing, observation emission, and structured logging. diff --git a/openspec/changes/agent-codex-worker-stale-proposal-archive-job-2026-05-15-12-03/specs/worker-stale-proposal-archive-job/spec.md b/openspec/changes/agent-codex-worker-stale-proposal-archive-job-2026-05-15-12-03/specs/worker-stale-proposal-archive-job/spec.md new file mode 100644 index 0000000..276dd90 --- /dev/null +++ b/openspec/changes/agent-codex-worker-stale-proposal-archive-job-2026-05-15-12-03/specs/worker-stale-proposal-archive-job/spec.md @@ -0,0 +1,23 @@ +## ADDED Requirements + +### Requirement: Worker archives stale proposed task threads +The worker SHALL archive task rows with `proposal_status='proposed'` once they are older than seven days. + +#### Scenario: Old proposed task is archived +- **GIVEN** a task row with `proposal_status='proposed'` and `created_at` more than seven days in the past +- **WHEN** the stale proposal archive job runs +- **THEN** the task row's `proposal_status` is set to `archived` +- **AND** the proposing agent's `open_proposal_count` is decremented without going below zero +- **AND** a `proposal-auto-archived` observation is recorded on the task. + +#### Scenario: Fresh proposed task remains proposed +- **GIVEN** a task row with `proposal_status='proposed'` and `created_at` within the last seven days +- **WHEN** the stale proposal archive job runs +- **THEN** the task row remains `proposal_status='proposed'`. + +### Requirement: Worker schedules stale proposal archival +The worker runtime SHALL schedule the stale proposal archive job on a six-hour interval and emit structured JSON log lines for each run. + +#### Scenario: Scheduled surface logs archive result +- **WHEN** the worker invokes the stale proposal archive job +- **THEN** it emits a JSON log line containing the job name, archived count, and archived task ids. diff --git a/openspec/changes/agent-codex-worker-stale-proposal-archive-job-2026-05-15-12-03/tasks.md b/openspec/changes/agent-codex-worker-stale-proposal-archive-job-2026-05-15-12-03/tasks.md new file mode 100644 index 0000000..2ae3309 --- /dev/null +++ b/openspec/changes/agent-codex-worker-stale-proposal-archive-job-2026-05-15-12-03/tasks.md @@ -0,0 +1,34 @@ +## Definition of Done + +This change is complete only when **all** of the following are true: + +- Every checkbox below is checked. +- The agent branch reaches `MERGED` state on `origin` and the PR URL + state are recorded in the completion handoff. +- If any step blocks (test failure, conflict, ambiguous result), append a `BLOCKED:` line under section 4 explaining the blocker and **STOP**. Do not tick remaining cleanup boxes; do not silently skip the cleanup pipeline. + +## Handoff + +- Handoff: change=`agent-codex-worker-stale-proposal-archive-job-2026-05-15-12-03`; branch=`agent/codex/worker-stale-proposal-archive-job-2026-05-15-12-03`; scope=`worker stale proposal archive job`; action=`finish via Guardex PR flow after verification`. +- Copy prompt: Continue `agent-codex-worker-stale-proposal-archive-job-2026-05-15-12-03` on branch `agent/codex/worker-stale-proposal-archive-job-2026-05-15-12-03`. Work inside the existing sandbox, review `openspec/changes/agent-codex-worker-stale-proposal-archive-job-2026-05-15-12-03/tasks.md`, continue from the current state instead of creating a new sandbox, and when the work is done run `gx branch finish --branch agent/codex/worker-stale-proposal-archive-job-2026-05-15-12-03 --base main --via-pr --cleanup`. + +## 1. Specification + +- [x] 1.1 Finalize proposal scope and acceptance criteria for `agent-codex-worker-stale-proposal-archive-job-2026-05-15-12-03`. +- [x] 1.2 Define normative requirements in `specs/worker-stale-proposal-archive-job/spec.md`. + +## 2. Implementation + +- [x] 2.1 Implement scoped behavior changes. +- [x] 2.2 Add/update focused regression coverage. + +## 3. Verification + +- [x] 3.1 Run targeted project verification commands. +- [x] 3.2 Run `openspec validate agent-codex-worker-stale-proposal-archive-job-2026-05-15-12-03 --type change --strict`. +- [x] 3.3 Run `openspec validate --specs`. + +## 4. Cleanup (mandatory; run before claiming completion) + +- [ ] 4.1 Run the cleanup pipeline: `gx branch finish --branch agent/codex/worker-stale-proposal-archive-job-2026-05-15-12-03 --base main --via-pr --cleanup`. This handles commit -> push -> PR create -> merge wait -> worktree prune in one invocation. +- [ ] 4.2 Record the PR URL and final merge state (`MERGED`) in the completion handoff. +- [ ] 4.3 Confirm the sandbox worktree is gone (`git worktree list` no longer shows the agent path; `git branch -a` shows no surviving local/remote refs for the branch).