Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 116 additions & 4 deletions agent-node/src/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,16 @@ const SCHEMA = `
id INTEGER PRIMARY KEY AUTOINCREMENT,
stream_id TEXT NOT NULL,
repository TEXT NOT NULL,
pr_number INTEGER NOT NULL,
pr_number INTEGER,
event_ref TEXT,
chain_id INTEGER NOT NULL,
chain_name TEXT NOT NULL,
tx_hash TEXT,
block_number INTEGER,
gas_used TEXT,
voucher_expiry INTEGER,
created_at INTEGER NOT NULL DEFAULT (unixepoch()),
UNIQUE (stream_id, repository, pr_number)
voucher_expiry INTEGER,
extension_seconds INTEGER,
created_at INTEGER NOT NULL DEFAULT (unixepoch())
);

CREATE TABLE IF NOT EXISTS stream_registry (
Expand Down Expand Up @@ -96,6 +97,21 @@ const SCHEMA = `
account TEXT,
created_at INTEGER NOT NULL DEFAULT (unixepoch())
);

-- Verified work earned but not yet applied on-chain. Each verified deliverable
-- is banked here first, then drawn down as the stream's runway + weekly cap
-- allow. Overflow (e.g. PRs beyond the weekly cap) carries into later weeks
-- instead of being lost.
CREATE TABLE IF NOT EXISTS banked_work (
id INTEGER PRIMARY KEY AUTOINCREMENT,
stream_id TEXT NOT NULL,
source TEXT,
repository TEXT,
event_ref TEXT NOT NULL,
extension_seconds INTEGER NOT NULL,
created_at INTEGER NOT NULL DEFAULT (unixepoch()),
UNIQUE (stream_id, event_ref)
);
`;

/**
Expand Down Expand Up @@ -169,6 +185,46 @@ export async function initDb() {
try { await db.execute(sql); } catch { /* column already exists */ }
}

// ── processed_extensions rebuild ───────────────────────────────────────────
// Older DBs created pr_number as INTEGER NOT NULL with UNIQUE(stream_id,
// repository, pr_number). Webhook extensions pass prNumber=null, so the row
// violated NOT NULL and was silently dropped by INSERT OR IGNORE — extensions
// never reached the activity feed. Rebuild with pr_number nullable; event_ref
// is the real dedup key. Atomic via batch — rolls back if anything fails.
try {
const info = await db.execute('PRAGMA table_info(processed_extensions)');
const prCol = info.rows.find(r => r.name === 'pr_number');
if (prCol && Number(prCol.notnull) === 1) {
console.log('[db] Rebuilding processed_extensions (pr_number → nullable, event_ref key)…');
await db.batch([
`CREATE TABLE processed_extensions_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
stream_id TEXT NOT NULL, repository TEXT NOT NULL,
pr_number INTEGER, event_ref TEXT,
chain_id INTEGER NOT NULL, chain_name TEXT NOT NULL,
tx_hash TEXT, block_number INTEGER, gas_used TEXT,
voucher_expiry INTEGER, extension_seconds INTEGER,
created_at INTEGER NOT NULL DEFAULT (unixepoch())
)`,
`INSERT INTO processed_extensions_new
(id, stream_id, repository, pr_number, event_ref, chain_id, chain_name,
tx_hash, block_number, gas_used, voucher_expiry, extension_seconds, created_at)
SELECT id, stream_id, repository, pr_number, event_ref, chain_id, chain_name,
tx_hash, block_number, gas_used, voucher_expiry, extension_seconds, created_at
FROM processed_extensions`,
'DROP TABLE processed_extensions',
'ALTER TABLE processed_extensions_new RENAME TO processed_extensions',
], 'write');
console.log('[db] ✓ processed_extensions rebuilt');
}
} catch (err) {
console.warn('[db] processed_extensions rebuild skipped:', err.message);
}
// Dedup on the real key. SQLite allows multiple NULL event_refs (legacy rows).
try {
await db.execute('CREATE UNIQUE INDEX IF NOT EXISTS idx_proc_ext_event ON processed_extensions (stream_id, repository, event_ref)');
} catch { /* duplicate legacy data — replay guard still dedups */ }

console.log('[db] ✓ Schema initialized');
}

Expand Down Expand Up @@ -253,6 +309,62 @@ export async function getWeeklyExtendedSeconds(streamId, weekStartTime) {
return Number(result.rows[0]?.total ?? 0);
}

// ─── Banked Work ──────────────────────────────────────────────────────────────
// Verified deliverables earned but not yet applied on-chain. The agent drains
// these as the stream's runway and weekly cap allow, so bursty work (or work
// beyond the weekly cap) is carried forward rather than lost.

/** Queue a verified deliverable. Idempotent on (stream_id, event_ref). */
export async function bankWork({ streamId, source, repository, eventRef, extensionSeconds }) {
const db = getDb();
if (!db) return;
await db.execute({
sql: `INSERT OR IGNORE INTO banked_work (stream_id, source, repository, event_ref, extension_seconds)
VALUES (?, ?, ?, ?, ?)`,
args: [streamId, source ?? null, repository ?? null, String(eventRef), Number(extensionSeconds)],
});
}

/** Has this event already been banked (still waiting to apply)? */
export async function isWorkBanked(streamId, eventRef) {
const db = getDb();
if (!db) return false;
const r = await db.execute({
sql: 'SELECT 1 FROM banked_work WHERE stream_id = ? AND event_ref = ? LIMIT 1',
args: [streamId, String(eventRef)],
});
return r.rows.length > 0;
}

/** Banked entries for a stream, oldest first (FIFO drain order). */
export async function getBankedWork(streamId) {
const db = getDb();
if (!db) return [];
const r = await db.execute({
sql: 'SELECT * FROM banked_work WHERE stream_id = ? ORDER BY created_at ASC, id ASC',
args: [streamId],
});
return r.rows;
}

/** Remove a banked entry once it has been applied on-chain. */
export async function deleteBankedWork(id) {
const db = getDb();
if (!db) return;
await db.execute({ sql: 'DELETE FROM banked_work WHERE id = ?', args: [id] });
}

/** Registry rows for every stream that has at least one banked entry waiting. */
export async function getStreamsWithBankedWork() {
const db = getDb();
if (!db) return [];
const r = await db.execute(
`SELECT s.* FROM stream_registry s
WHERE s.stream_id IN (SELECT DISTINCT stream_id FROM banked_work)`,
);
return r.rows;
}

// ─── Stream Repo Lookup ───────────────────────────────────────────────────────

/**
Expand Down
19 changes: 15 additions & 4 deletions agent-node/src/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import helmet from 'helmet';
import rateLimit from 'express-rate-limit';

import { verifyMilestone, VerificationError } from './verifyMilestone.js';
import { verifyGitHubWebhook, verifyJiraWebhook, verifyBitbucketWebhook, verifyFigmaWebhook, extendFromEvent, checkStream } from './verificationEngine.js';
import { verifyGitHubWebhook, verifyJiraWebhook, verifyBitbucketWebhook, verifyFigmaWebhook, extendFromEvent, checkStream, drainAllBankedWork } from './verificationEngine.js';
import { signExtensionVoucher, getSignerAddress } from './agentSigner.js';
import { submitExtension, getAllBalances, readStreamBatch } from './chainSubmitter.js';
import { initDb, isAlreadyProcessed, recordExtension, getExtensionCount, registerStream, getStream, getStreamsByRepo, getStreamsBySource, getStreamsForAddress, getDb, upsertProfile, getProfile, getProfileByUsername, getProfileByApiKey, searchProfiles, isUsernameTaken, addToWaitlist, getWaitlistCount, saveOAuthTokens, disconnectOAuth, saveRepoInstallation, removeRepoInstallation, saveJiraWebhookIds, getProfileByJiraWebhookId, getInstallationIdForRepo } from './db.js';
import { initDb, isAlreadyProcessed, recordExtension, getExtensionCount, registerStream, getStream, getStreamsByRepo, getStreamsBySource, getStreamsForAddress, getDb, upsertProfile, getProfile, getProfileByUsername, getProfileByApiKey, searchProfiles, isUsernameTaken, addToWaitlist, getWaitlistCount, saveOAuthTokens, disconnectOAuth, saveRepoInstallation, removeRepoInstallation, saveJiraWebhookIds, getProfileByJiraWebhookId, getInstallationIdForRepo, getBankedWork } from './db.js';
import { publicProfile } from './encryption.js';
import publicApiRouter from './publicApi.js';
import { startStreamListeners } from './streamListener.js';
Expand Down Expand Up @@ -1527,15 +1527,18 @@ app.get('/api/v1/stream-status/:streamId', async (req, res) => {

try {
const db = getDb();
const [stream, extResult] = await Promise.all([
const [stream, extResult, banked] = await Promise.all([
getStream(streamId),
db.execute({
sql: 'SELECT * FROM processed_extensions WHERE stream_id = ? ORDER BY created_at DESC',
args: [streamId],
}),
getBankedWork(streamId),
]);

return res.json({ streamId, stream, extensions: extResult.rows });
// banked = verified deliverables earned but not yet applied on-chain (queued
// behind the runway / weekly cap). Returned newest-first to match extensions.
return res.json({ streamId, stream, extensions: extResult.rows, banked: [...banked].reverse() });
} catch (err) {
console.error('[stream-status] Error:', err);
return res.status(500).json({ error: 'Failed to fetch stream status' });
Expand Down Expand Up @@ -1877,4 +1880,12 @@ app.listen(PORT, async () => {
}

console.log('═══════════════════════════════════════════════════');

// Periodic banked-work drainer — applies earned work that couldn't be applied
// when it was verified (weekly cap hit, or stream still had runway) once a new
// week resets the cap or the runway frees up, even with no new webhook.
const DRAIN_INTERVAL_MS = parseInt(process.env.BANK_DRAIN_INTERVAL_MS ?? String(10 * 60 * 1000), 10);
setInterval(() => {
drainAllBankedWork().catch(err => console.error('[drain] sweep failed:', err.message));
}, DRAIN_INTERVAL_MS).unref();
});
Loading
Loading