Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions apps/server/src/ingest/cron.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,18 +229,24 @@ export async function runOsvDelta(db: D1LikeDatabase, cards: CardReader): Promis
);
}

// If we exited on the time budget, leave next_index pointing back at this
// ecosystem so the next cron tick resumes from the same watermark.
const advance = stopped ? 0 : 1;
// Rotation policy:
// stopped + processed > 0 → mid-work, stay here next tick to finish
// stopped + processed == 0 → caught up (read the whole zip, rejected
// every record on the watermark), advance to avoid an infinite stall
// re-reading the same archive forever
// not stopped → finished naturally, advance
const advance = stopped && processed > 0 ? 0 : 1;
const nextCursor: OsvCursor = {
next_index: (cursor.next_index + advance) % ROTATION.length,
watermarks: { ...cursor.watermarks, [ecosystem]: newWatermark }};
await recordIngestionState(db, "osv", "ok", processed, {
lastModified: JSON.stringify(nextCursor)});
logIngest(
tag,
`✓ done — ${processed} records in ${fmtElapsed(startedAt)}${stopped ? " (budget hit, resumes next tick)" : ""}`,
);
const tail = !stopped
? ""
: processed > 0
? " (budget hit, resumes next tick)"
: " (caught up, advancing rotation)";
logIngest(tag, `✓ done — ${processed} records in ${fmtElapsed(startedAt)}${tail}`);
} catch (e) {
// Try to flush whatever made it through before the failure so we don't
// re-process those records on the next run.
Expand Down
38 changes: 26 additions & 12 deletions apps/server/src/ingest/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,25 +152,39 @@ export function buildScheduler(
`refuse: cron started — osv every ${config.REFUSE_OSV_FREQUENCY}m, deps.dev every ${config.REFUSE_DEPS_DEV_FREQUENCY}m, enrichment "${config.REFUSE_ENRICHMENT_CRON}"`,
);

// First-boot bootstrap: if the vulnerabilities table is empty, kick all
// three jobs in parallel so the server has data within minutes instead
// of waiting up to 24h for the enrichment cron tick. Each runs under its
// own concurrency guard so the cron tick that fires later just no-ops.
// First-boot bootstrap: kick each job whose source has never completed
// a successful run. Decoupling per-source means an upgrade from an
// older image — where OSV ran but the enrichment cron hasn't fired yet
// — still backfills KEV/EPSS/GHSA/Wolfi instead of waiting up to 24h.
// Each job runs under its own concurrency guard so the regular cron
// tick that fires later just no-ops.
if (config.REFUSE_BOOTSTRAP_ON_EMPTY) {
const row = rawDb
const done = readSourcesDone(rawDb);
const vulnRow = rawDb
.prepare(`SELECT COUNT(*) AS n FROM vulnerabilities`)
.get() as { n: number } | undefined;
if (!row || row.n === 0) {
console.log(
`refuse: empty vulnerabilities table — kicking initial pass on all sources in parallel ` +
`(osv across ${OSV_ROTATION_TOTAL} ecosystems @ 1/tick, kev, epss, ghsa, wolfi). ` +
`Watch /readyz for progress.`,
);
// Fire-and-forget; each job's logging lives inside makeJob.
const vulnsEmpty = !vulnRow || vulnRow.n === 0;
const enrichmentSources = ["kev", "epss", "ghsa_direct", "wolfi"] as const;
const missingEnrichment = enrichmentSources.filter((s) => !done.has(s));

const kicks: string[] = [];
if (vulnsEmpty || !done.has("osv")) {
kicks.push("osv");
osvJob().catch(() => {});
}
if (!done.has("deps_dev")) {
kicks.push("deps-dev");
depsDevJob().catch(() => {});
}
if (missingEnrichment.length > 0) {
kicks.push(`enrichment(${missingEnrichment.join(",")})`);
enrichJob().catch(() => {});
}
if (kicks.length > 0) {
console.log(
`refuse: bootstrap — kicking ${kicks.join(" + ")} in parallel. Watch /readyz for progress.`,
);
}
}
},
stop(): void {
Expand Down