diff --git a/apps/server/src/ingest/cron.ts b/apps/server/src/ingest/cron.ts index 54c18b7..0beb4a5 100644 --- a/apps/server/src/ingest/cron.ts +++ b/apps/server/src/ingest/cron.ts @@ -229,18 +229,24 @@ export async function runOsvDelta(db: D1LikeDatabase, cards: CardReader): Promis ); } - // If we exited on the time budget, leave next_index pointing back at this - // ecosystem so the next cron tick resumes from the same watermark. - const advance = stopped ? 0 : 1; + // Rotation policy: + // stopped + processed > 0 → mid-work, stay here next tick to finish + // stopped + processed == 0 → caught up (read the whole zip, rejected + // every record on the watermark), advance to avoid an infinite stall + // re-reading the same archive forever + // not stopped → finished naturally, advance + const advance = stopped && processed > 0 ? 0 : 1; const nextCursor: OsvCursor = { next_index: (cursor.next_index + advance) % ROTATION.length, watermarks: { ...cursor.watermarks, [ecosystem]: newWatermark }}; await recordIngestionState(db, "osv", "ok", processed, { lastModified: JSON.stringify(nextCursor)}); - logIngest( - tag, - `✓ done — ${processed} records in ${fmtElapsed(startedAt)}${stopped ? " (budget hit, resumes next tick)" : ""}`, - ); + const tail = !stopped + ? "" + : processed > 0 + ? " (budget hit, resumes next tick)" + : " (caught up, advancing rotation)"; + logIngest(tag, `✓ done — ${processed} records in ${fmtElapsed(startedAt)}${tail}`); } catch (e) { // Try to flush whatever made it through before the failure so we don't // re-process those records on the next run. diff --git a/apps/server/src/ingest/scheduler.ts b/apps/server/src/ingest/scheduler.ts index 0b221cc..9b98cc6 100644 --- a/apps/server/src/ingest/scheduler.ts +++ b/apps/server/src/ingest/scheduler.ts @@ -152,25 +152,39 @@ export function buildScheduler( `refuse: cron started — osv every ${config.REFUSE_OSV_FREQUENCY}m, deps.dev every ${config.REFUSE_DEPS_DEV_FREQUENCY}m, enrichment "${config.REFUSE_ENRICHMENT_CRON}"`, ); - // First-boot bootstrap: if the vulnerabilities table is empty, kick all - // three jobs in parallel so the server has data within minutes instead - // of waiting up to 24h for the enrichment cron tick. Each runs under its - // own concurrency guard so the cron tick that fires later just no-ops. + // First-boot bootstrap: kick each job whose source has never completed + // a successful run. Decoupling per-source means an upgrade from an + // older image — where OSV ran but the enrichment cron hasn't fired yet + // — still backfills KEV/EPSS/GHSA/Wolfi instead of waiting up to 24h. + // Each job runs under its own concurrency guard so the regular cron + // tick that fires later just no-ops. if (config.REFUSE_BOOTSTRAP_ON_EMPTY) { - const row = rawDb + const done = readSourcesDone(rawDb); + const vulnRow = rawDb .prepare(`SELECT COUNT(*) AS n FROM vulnerabilities`) .get() as { n: number } | undefined; - if (!row || row.n === 0) { - console.log( - `refuse: empty vulnerabilities table — kicking initial pass on all sources in parallel ` + - `(osv across ${OSV_ROTATION_TOTAL} ecosystems @ 1/tick, kev, epss, ghsa, wolfi). ` + - `Watch /readyz for progress.`, - ); - // Fire-and-forget; each job's logging lives inside makeJob. + const vulnsEmpty = !vulnRow || vulnRow.n === 0; + const enrichmentSources = ["kev", "epss", "ghsa_direct", "wolfi"] as const; + const missingEnrichment = enrichmentSources.filter((s) => !done.has(s)); + + const kicks: string[] = []; + if (vulnsEmpty || !done.has("osv")) { + kicks.push("osv"); osvJob().catch(() => {}); + } + if (!done.has("deps_dev")) { + kicks.push("deps-dev"); depsDevJob().catch(() => {}); + } + if (missingEnrichment.length > 0) { + kicks.push(`enrichment(${missingEnrichment.join(",")})`); enrichJob().catch(() => {}); } + if (kicks.length > 0) { + console.log( + `refuse: bootstrap — kicking ${kicks.join(" + ")} in parallel. Watch /readyz for progress.`, + ); + } } }, stop(): void {