From ebe4e3ce033fd4fe65f09f6d87fa903358b1a737 Mon Sep 17 00:00:00 2001 From: gok03 Date: Wed, 10 Jun 2026 13:53:03 +0530 Subject: [PATCH] fix(ingest): unstick OSV rotation when caught up + per-source bootstrap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two bugs surfaced once the v0.1.2 progress bars made the ingest behavior visible: 1. **OSV rotation stuck on first ecosystem.** The previous policy was `advance = stopped ? 0 : 1`. On a persistent volume where the npm watermark was already up to date, every cron tick spent the full 60 s budget reading the ~200 MB npm zip rejecting every record on the watermark, producing `processed = 0` but `stopped = true` — so `advance = 0` and the rotation re-attempted npm forever. PyPI, Maven, distros never got a turn. Fix: `advance = stopped && processed > 0 ? 0 : 1`. When we burn the budget without processing a single record, we've caught up — move on instead of looping. Log line distinguishes "budget hit, resumes next tick" from the new "caught up, advancing rotation" case. 2. **Bootstrap-on-empty missed upgrade paths.** Checking only `vulnerabilities COUNT == 0` meant a v0.1.0 → v0.1.2 upgrade — where the volume already has OSV data but KEV/EPSS/GHSA/Wolfi never ran — skipped the enrichment bootstrap entirely, leaving /readyz stuck on `pending_sources: ["kev","epss","ghsa_direct","wolfi"]` until the daily 5 am UTC cron. Fix: check per-source `ingestion_state.last_ok_at IS NULL` and kick each job whose source hasn't completed a successful run. OSV stays keyed on the vuln-table-empty check OR `last_ok_at IS NULL` for the osv row. New banner lists what's being kicked: refuse: bootstrap — kicking enrichment(kev,epss,ghsa_direct,wolfi) in parallel. Watch /readyz for progress. --- apps/server/src/ingest/cron.ts | 20 +++++++++------ apps/server/src/ingest/scheduler.ts | 38 ++++++++++++++++++++--------- 2 files changed, 39 insertions(+), 19 deletions(-) diff --git a/apps/server/src/ingest/cron.ts b/apps/server/src/ingest/cron.ts index 54c18b7..0beb4a5 100644 --- a/apps/server/src/ingest/cron.ts +++ b/apps/server/src/ingest/cron.ts @@ -229,18 +229,24 @@ export async function runOsvDelta(db: D1LikeDatabase, cards: CardReader): Promis ); } - // If we exited on the time budget, leave next_index pointing back at this - // ecosystem so the next cron tick resumes from the same watermark. - const advance = stopped ? 0 : 1; + // Rotation policy: + // stopped + processed > 0 → mid-work, stay here next tick to finish + // stopped + processed == 0 → caught up (read the whole zip, rejected + // every record on the watermark), advance to avoid an infinite stall + // re-reading the same archive forever + // not stopped → finished naturally, advance + const advance = stopped && processed > 0 ? 0 : 1; const nextCursor: OsvCursor = { next_index: (cursor.next_index + advance) % ROTATION.length, watermarks: { ...cursor.watermarks, [ecosystem]: newWatermark }}; await recordIngestionState(db, "osv", "ok", processed, { lastModified: JSON.stringify(nextCursor)}); - logIngest( - tag, - `✓ done — ${processed} records in ${fmtElapsed(startedAt)}${stopped ? " (budget hit, resumes next tick)" : ""}`, - ); + const tail = !stopped + ? "" + : processed > 0 + ? " (budget hit, resumes next tick)" + : " (caught up, advancing rotation)"; + logIngest(tag, `✓ done — ${processed} records in ${fmtElapsed(startedAt)}${tail}`); } catch (e) { // Try to flush whatever made it through before the failure so we don't // re-process those records on the next run. diff --git a/apps/server/src/ingest/scheduler.ts b/apps/server/src/ingest/scheduler.ts index 0b221cc..9b98cc6 100644 --- a/apps/server/src/ingest/scheduler.ts +++ b/apps/server/src/ingest/scheduler.ts @@ -152,25 +152,39 @@ export function buildScheduler( `refuse: cron started — osv every ${config.REFUSE_OSV_FREQUENCY}m, deps.dev every ${config.REFUSE_DEPS_DEV_FREQUENCY}m, enrichment "${config.REFUSE_ENRICHMENT_CRON}"`, ); - // First-boot bootstrap: if the vulnerabilities table is empty, kick all - // three jobs in parallel so the server has data within minutes instead - // of waiting up to 24h for the enrichment cron tick. Each runs under its - // own concurrency guard so the cron tick that fires later just no-ops. + // First-boot bootstrap: kick each job whose source has never completed + // a successful run. Decoupling per-source means an upgrade from an + // older image — where OSV ran but the enrichment cron hasn't fired yet + // — still backfills KEV/EPSS/GHSA/Wolfi instead of waiting up to 24h. + // Each job runs under its own concurrency guard so the regular cron + // tick that fires later just no-ops. if (config.REFUSE_BOOTSTRAP_ON_EMPTY) { - const row = rawDb + const done = readSourcesDone(rawDb); + const vulnRow = rawDb .prepare(`SELECT COUNT(*) AS n FROM vulnerabilities`) .get() as { n: number } | undefined; - if (!row || row.n === 0) { - console.log( - `refuse: empty vulnerabilities table — kicking initial pass on all sources in parallel ` + - `(osv across ${OSV_ROTATION_TOTAL} ecosystems @ 1/tick, kev, epss, ghsa, wolfi). ` + - `Watch /readyz for progress.`, - ); - // Fire-and-forget; each job's logging lives inside makeJob. + const vulnsEmpty = !vulnRow || vulnRow.n === 0; + const enrichmentSources = ["kev", "epss", "ghsa_direct", "wolfi"] as const; + const missingEnrichment = enrichmentSources.filter((s) => !done.has(s)); + + const kicks: string[] = []; + if (vulnsEmpty || !done.has("osv")) { + kicks.push("osv"); osvJob().catch(() => {}); + } + if (!done.has("deps_dev")) { + kicks.push("deps-dev"); depsDevJob().catch(() => {}); + } + if (missingEnrichment.length > 0) { + kicks.push(`enrichment(${missingEnrichment.join(",")})`); enrichJob().catch(() => {}); } + if (kicks.length > 0) { + console.log( + `refuse: bootstrap — kicking ${kicks.join(" + ")} in parallel. Watch /readyz for progress.`, + ); + } } }, stop(): void {