Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 201 additions & 0 deletions apps/worker/src/jobs/proposals-archive.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
import type { MemoryStore } from '@colony/core';

export const PROPOSAL_ARCHIVE_INTERVAL_MS = 6 * 60 * 60_000;
export const STALE_PROPOSAL_AGE_MS = 7 * 24 * 60 * 60_000;
export const PROPOSAL_ARCHIVE_SESSION_ID = 'colony-worker:proposals-archive';

export interface ProposalArchiveJobResult {
archived_count: number;
archived_task_ids: number[];
}

export interface ProposalArchiveJobOptions {
now?: () => number;
staleAfterMs?: number;
}

export interface ProposalArchiveLoopHandle {
stop: () => Promise<void>;
lastResult: () => ProposalArchiveJobResult | null;
runNow: () => Promise<ProposalArchiveJobResult>;
}

export interface ProposalArchiveLoopOptions extends ProposalArchiveJobOptions {
store: MemoryStore;
intervalMs?: number;
log?: (line: string) => void;
}

interface SqlRunResult {
changes: number;
lastInsertRowid: number | bigint;
}

interface SqlStatement {
all(...args: unknown[]): Array<Record<string, unknown>>;
get(...args: unknown[]): Record<string, unknown> | undefined;
run(...args: unknown[]): SqlRunResult;
}

interface SqlDatabase {
prepare(sql: string): SqlStatement;
}

interface StorageWithDb {
db: SqlDatabase;
}

interface ProposedTaskRow {
id: number;
title: string;
created_by: string;
}

export function runProposalArchiveJob(
store: MemoryStore,
opts: ProposalArchiveJobOptions = {},
): ProposalArchiveJobResult {
const now = opts.now?.() ?? Date.now();
const cutoff = now - (opts.staleAfterMs ?? STALE_PROPOSAL_AGE_MS);
const db = rawDb(store);
return store.storage.transaction(
() => {
store.storage.createSession({
id: PROPOSAL_ARCHIVE_SESSION_ID,
ide: 'worker',
cwd: null,
started_at: now,
metadata: null,
});

const stale = db
.prepare(
`SELECT id, title, created_by
FROM tasks
WHERE proposal_status = 'proposed'
AND created_at < ?
ORDER BY created_at ASC, id ASC`,
)
.all(cutoff)
.map(normalizeTaskRow);
const archivedTaskIds: number[] = [];

for (const task of stale) {
const result = db
.prepare(
`UPDATE tasks
SET proposal_status = 'archived',
updated_at = ?
WHERE id = ?
AND proposal_status = 'proposed'`,
)
.run(now, task.id);
if (result.changes === 0) continue;

db.prepare(
`UPDATE agent_profiles
SET open_proposal_count = CASE
WHEN open_proposal_count > 0 THEN open_proposal_count - 1
ELSE 0
END,
updated_at = ?
WHERE agent = ?`,
).run(now, task.created_by);
store.storage.insertObservation({
session_id: PROPOSAL_ARCHIVE_SESSION_ID,
kind: 'proposal-auto-archived',
content: `Auto-archived stale proposal task ${task.id}: ${task.title}`,
compressed: false,
intensity: null,
ts: now,
task_id: task.id,
metadata: {
proposer: task.created_by,
stale_after_ms: opts.staleAfterMs ?? STALE_PROPOSAL_AGE_MS,
},
});
archivedTaskIds.push(task.id);
}

return {
archived_count: archivedTaskIds.length,
archived_task_ids: archivedTaskIds,
};
},
{ immediate: true },
);
}

export function startProposalArchiveJobLoop(
opts: ProposalArchiveLoopOptions,
): ProposalArchiveLoopHandle {
const { store } = opts;
const intervalMs = opts.intervalMs ?? PROPOSAL_ARCHIVE_INTERVAL_MS;
const log = opts.log ?? ((line: string) => process.stderr.write(`${line}\n`));
let stopped = false;
let inFlight: Promise<ProposalArchiveJobResult | undefined> | null = null;
let latest: ProposalArchiveJobResult | null = null;

const runOnce = (): ProposalArchiveJobResult => {
const result = runProposalArchiveJob(store, opts);
latest = result;
logRun(log, result);
return result;
};

const tick = async (): Promise<ProposalArchiveJobResult | undefined> => {
if (stopped) return;
try {
return runOnce();
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
log(JSON.stringify({ component: 'colony-worker', job: 'proposals-archive', error: message }));
}
};

let timer: NodeJS.Timeout | null = null;
if (intervalMs !== 0) {
const firstRunDelay = Math.min(5_000, intervalMs);
timer = setTimeout(function loop() {
if (stopped) return;
inFlight = tick();
void inFlight.finally(() => {
if (stopped) return;
timer = setTimeout(loop, intervalMs);
});
}, firstRunDelay);
}

return {
async stop() {
stopped = true;
if (timer) clearTimeout(timer);
if (inFlight) await inFlight;
},
lastResult: () => latest,
runNow: async () => runOnce(),
};
}

function rawDb(store: MemoryStore): SqlDatabase {
return (store.storage as unknown as StorageWithDb).db;
}

function normalizeTaskRow(row: Record<string, unknown>): ProposedTaskRow {
return {
id: typeof row.id === 'number' ? row.id : 0,
title: typeof row.title === 'string' ? row.title : '',
created_by: typeof row.created_by === 'string' ? row.created_by : '',
};
}

function logRun(log: (line: string) => void, result: ProposalArchiveJobResult): void {
log(
JSON.stringify({
component: 'colony-worker',
job: 'proposals-archive',
archived_count: result.archived_count,
archived_task_ids: result.archived_task_ids,
}),
);
}
11 changes: 11 additions & 0 deletions apps/worker/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import {
startCoordinationSweepLoop,
} from './coordination-sweep-loop.js';
import { type EmbedLoopHandle, startEmbedLoop, stateFilePath } from './embed-loop.js';
import {
type ProposalArchiveLoopHandle,
startProposalArchiveJobLoop,
} from './jobs/proposals-archive.js';
import { type RescueLoopHandle, startRescueLoop } from './rescue-loop.js';
import {
type StrandedSessionSummary,
Expand Down Expand Up @@ -597,6 +601,7 @@ export async function start(): Promise<void> {
const handles: {
rescueLoop?: RescueLoopHandle;
coordinationSweepLoop?: CoordinationSweepLoopHandle;
proposalArchiveLoop?: ProposalArchiveLoopHandle;
} = {};
const servers: Array<ReturnType<typeof serve>> = [];

Expand All @@ -605,6 +610,7 @@ export async function start(): Promise<void> {
caffeinate?.stop();
if (handles.rescueLoop) await handles.rescueLoop.stop();
if (handles.coordinationSweepLoop) await handles.coordinationSweepLoop.stop();
if (handles.proposalArchiveLoop) await handles.proposalArchiveLoop.stop();
if (loop) await loop.stop();
for (const s of servers) s.close();
store.close();
Expand Down Expand Up @@ -692,6 +698,11 @@ export async function start(): Promise<void> {
log: (line) => process.stderr.write(`${line}\n`),
});

handles.proposalArchiveLoop = startProposalArchiveJobLoop({
store,
log: (line) => process.stderr.write(`${line}\n`),
});

const app = buildApp(store, loop, {
rescueLoop: handles.rescueLoop,
fileHeatHalfLifeMinutes: settings.fileHeatHalfLifeMinutes,
Expand Down
155 changes: 155 additions & 0 deletions apps/worker/test/jobs/proposals-archive.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import { mkdtempSync, rmSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { defaultSettings } from '@colony/config';
import { MemoryStore } from '@colony/core';
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import {
STALE_PROPOSAL_AGE_MS,
runProposalArchiveJob,
startProposalArchiveJobLoop,
} from '../../src/jobs/proposals-archive.js';

interface SqlRunResult {
changes: number;
lastInsertRowid: number | bigint;
}

interface SqlStatement {
all(...args: unknown[]): Array<Record<string, unknown>>;
get(...args: unknown[]): Record<string, unknown> | undefined;
run(...args: unknown[]): SqlRunResult;
}

interface SqlDatabase {
prepare(sql: string): SqlStatement;
}

interface StorageWithDb {
db: SqlDatabase;
}

let dir: string;
let store: MemoryStore;
let db: SqlDatabase;

beforeEach(() => {
dir = mkdtempSync(join(tmpdir(), 'colony-worker-proposals-archive-'));
store = new MemoryStore({ dbPath: join(dir, 'data.db'), settings: defaultSettings });
db = (store.storage as unknown as StorageWithDb).db;
});

afterEach(() => {
store.close();
rmSync(dir, { recursive: true, force: true });
});

describe('proposal archive job', () => {
it('archives proposed tasks older than seven days and decrements the proposer count', () => {
const now = 1_800_000_000_000;
seedProfile('scout-A', 2);
const staleTaskId = seedProposalTask({
title: 'old proposal',
branch: 'proposal/old',
createdBy: 'scout-A',
createdAt: now - STALE_PROPOSAL_AGE_MS - 60_000,
});
const freshTaskId = seedProposalTask({
title: 'fresh proposal',
branch: 'proposal/fresh',
createdBy: 'scout-A',
createdAt: now - 60_000,
});

const result = runProposalArchiveJob(store, { now: () => now });

expect(result).toEqual({ archived_count: 1, archived_task_ids: [staleTaskId] });
expect(taskStatus(staleTaskId)).toBe('archived');
expect(taskStatus(freshTaskId)).toBe('proposed');
expect(profileCount('scout-A')).toBe(1);
const observations = store.storage.taskObservationsByKind(
staleTaskId,
'proposal-auto-archived',
10,
);
expect(observations).toHaveLength(1);
expect(observations[0]?.content).toContain('Auto-archived stale proposal task');
expect(JSON.parse(observations[0]?.metadata ?? '{}')).toMatchObject({
proposer: 'scout-A',
stale_after_ms: STALE_PROPOSAL_AGE_MS,
});
});

it('runNow logs structured JSON for the scheduled worker surface', async () => {
const now = 1_800_000_000_000;
seedProfile('scout-A', 1);
const staleTaskId = seedProposalTask({
title: 'old proposal',
branch: 'proposal/logged',
createdBy: 'scout-A',
createdAt: now - STALE_PROPOSAL_AGE_MS - 60_000,
});
const logs: string[] = [];
const handle = startProposalArchiveJobLoop({
store,
intervalMs: 0,
now: () => now,
log: (line) => logs.push(line),
});

await handle.runNow();
await handle.stop();

expect(JSON.parse(logs[0] ?? '{}')).toEqual({
component: 'colony-worker',
job: 'proposals-archive',
archived_count: 1,
archived_task_ids: [staleTaskId],
});
});
});

function seedProfile(agent: string, openProposalCount: number): void {
store.storage.upsertAgentProfile({
agent,
capabilities: '{}',
role: 'scout',
open_proposal_count: openProposalCount,
updated_at: 1,
});
}

function seedProposalTask(args: {
title: string;
branch: string;
createdBy: string;
createdAt: number;
}): number {
const result = db
.prepare(
`INSERT INTO tasks(
title, repo_root, branch, status, created_by, created_at, updated_at,
proposal_status, approved_by, observation_evidence_ids
) VALUES (?, ?, ?, 'open', ?, ?, ?, 'proposed', NULL, ?)`,
)
.run(
args.title,
'/repo',
args.branch,
args.createdBy,
args.createdAt,
args.createdAt,
JSON.stringify([101]),
);
return Number(result.lastInsertRowid);
}

function taskStatus(taskId: number): string | null {
const row = db.prepare('SELECT proposal_status FROM tasks WHERE id = ?').get(taskId);
return typeof row?.proposal_status === 'string' ? row.proposal_status : null;
}

function profileCount(agent: string): number {
const row = store.storage.getAgentProfile(agent);
return row?.open_proposal_count ?? 0;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
schema: spec-driven
created: 2026-05-15
Loading
Loading