From 7be200861159a5f81e832846e9d5ea2838d0b2a8 Mon Sep 17 00:00:00 2001 From: gok03 Date: Wed, 10 Jun 2026 16:18:55 +0530 Subject: [PATCH] feat(ingest): bulk all.zip on first boot + parallel per-tick deltas MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The 1-ecosystem-per-tick rotation was a leftover from the original Workers runtime, where each cron call was capped at 5 min of CPU. On a self-hosted Node server neither limit applies — round-robin starvation just made the cold seed take ~2h. Two changes lift it to ~3 min: 1. runOsvBootstrap (cron.ts): on first boot, stream OSV's bulk all.zip (~200 MB compressed, every ecosystem in one file). Memory stays bounded — each record is upserted as the zip streams. Watermarks get populated per ecosystem from `affected[].package.ecosystem`, so subsequent delta ticks know exactly what's already loaded. 2. runOsvDelta (cron.ts): drop the 60s wall-clock budget and process every ecosystem in parallel with a concurrency cap. Configurable via REFUSE_OSV_CONCURRENCY (default 4). After the first bootstrap, deltas are tiny per ecosystem so each tick finishes in seconds. 3. scheduler.ts: when OSV's ingestion_state has never recorded a success, kick the new osvBulkJob instead of osvJob. Banner updates: refuse: bootstrap — kicking osv:bulk (all ecosystems in one pass) + deps-dev + enrichment(kev,epss,ghsa_direct,wolfi) in parallel. 4. openAllArchive() added to OsvFetcher (streaming version of the existing fetchAllArchive, which buffers the entire payload). 5. config.ts: REFUSE_OSV_CONCURRENCY env var (default 4) bounds the per-tick parallelism so we don't hammer GCS or burn local bandwidth. Tests + typecheck stay green; scheduler-test fixture extended with the new config field. --- apps/server/src/config.ts | 2 + apps/server/src/ingest/cron.ts | 283 ++++++++++++++++------- apps/server/src/ingest/scheduler.test.ts | 1 + apps/server/src/ingest/scheduler.ts | 20 +- apps/server/src/ingest/sources/osv.ts | 17 ++ 5 files changed, 226 insertions(+), 97 deletions(-) diff --git a/apps/server/src/config.ts b/apps/server/src/config.ts index 7ef85b0..95612d3 100644 --- a/apps/server/src/config.ts +++ b/apps/server/src/config.ts @@ -16,6 +16,7 @@ const RECOGNISED_KEYS = new Set([ "REFUSE_DEPS_DEV_FREQUENCY", "REFUSE_ENRICHMENT_CRON", "REFUSE_BOOTSTRAP_ON_EMPTY", + "REFUSE_OSV_CONCURRENCY", "REFUSE_LOG_LEVEL", "REFUSE_GITHUB_TOKEN", "REFUSE_DISABLE_INGEST", @@ -42,6 +43,7 @@ const schema = z.object({ REFUSE_DEPS_DEV_FREQUENCY: positiveInt.default(15), REFUSE_ENRICHMENT_CRON: z.string().default("0 5 * * *"), REFUSE_BOOTSTRAP_ON_EMPTY: boolish.default("true"), + REFUSE_OSV_CONCURRENCY: positiveInt.default(4), REFUSE_LOG_LEVEL: z.enum(["debug", "info", "warn", "error"]).default("info"), REFUSE_GITHUB_TOKEN: z.string().optional(), REFUSE_DISABLE_INGEST: boolish.default("false"), diff --git a/apps/server/src/ingest/cron.ts b/apps/server/src/ingest/cron.ts index 0beb4a5..4cb467d 100644 --- a/apps/server/src/ingest/cron.ts +++ b/apps/server/src/ingest/cron.ts @@ -134,33 +134,185 @@ function parseCursor(raw: string | null): OsvCursor { /** Flush every N normalized records into D1 to keep memory bounded. */ const OSV_FLUSH_AT = 80; +/** Default ecosystems fetched in parallel per delta tick. Overridable via REFUSE_OSV_CONCURRENCY. */ +export const OSV_DEFAULT_CONCURRENCY = 4; + +interface EcosystemPassResult { + ecosystem: string; + processed: number; + newWatermark: string; + affected: AffectedKey[]; + skipped404: boolean; + error?: string; +} + /** - * Wall-clock budget per cron run. The npm zip is ~200 MB compressed and the - * delta from a cold watermark is tens of thousands of records — far more than - * one invocation can handle. We process a slice each run, persist the - * watermark, and rely on the rotation cursor to resume here next tick. - * Bounded to leave headroom under the Workers Paid CPU cap (5 min). + * Pull one ecosystem's per-ecosystem zip, apply watermark, upsert records, and + * return aggregated stats. No wall-clock budget — designed for a self-hosted + * Node runtime that doesn't need the old Worker CPU cap. The caller decides + * concurrency by spawning multiple invocations. */ -const OSV_RUN_BUDGET_MS = 60 * 1000; +async function processOsvEcosystem( + db: D1LikeDatabase, + osv: ReturnType, + ecosystem: string, + initialWatermark: string, +): Promise { + const tag = `osv:${ecosystem}`; + const startedAt = Date.now(); + let processed = 0; + let lastReportedAt = 0; + let newWatermark = initialWatermark; + const affected = new Map(); + const buffer: ReturnType[] = []; + const flush = async (): Promise => { + const batch = buffer.filter((b): b is NonNullable => b !== null); + buffer.length = 0; + if (batch.length === 0) return; + const result = await upsertRecords(db, batch); + processed += result.vulnerabilities_written; + for (const k of result.affected_pairs_changed) { + affected.set(`${k.ecosystem}::${k.package_name}`, k); + } + if (Date.now() - lastReportedAt >= 5000) { + const est = OSV_ECOSYSTEM_ESTIMATE[ecosystem] ?? OSV_ECOSYSTEM_ESTIMATE_DEFAULT; + logIngest(tag, `${bar(processed / est)} • ${processed} records • ${fmtElapsed(startedAt)}`); + lastReportedAt = Date.now(); + } + }; + + const body = await osv.openEcosystemArchive(ecosystem); + if (body === null) { + logIngest(tag, `· archive 404 — ecosystem skipped`); + return { + ecosystem, + processed: 0, + newWatermark: initialWatermark, + affected: [], + skipped404: true, + }; + } -export async function runOsvDelta(db: D1LikeDatabase, cards: CardReader): Promise { + await streamZipRecords(body, { + onRecord: async ({ record }) => { + if (record.modified <= initialWatermark) return; + const norm = normalizeOsv(record); + if (!norm) return; + buffer.push(norm); + if (record.modified > newWatermark) newWatermark = record.modified; + if (buffer.length >= OSV_FLUSH_AT) await flush(); + }, + }); + await flush(); + + logIngest(tag, `✓ done — ${processed} records in ${fmtElapsed(startedAt)}`); + return { + ecosystem, + processed, + newWatermark, + affected: Array.from(affected.values()), + skipped404: false, + }; +} + +/** + * Delta refresh across all ecosystems, run in parallel with a soft concurrency + * cap. Replaces the previous "1 ecosystem per 5-min tick under a 60s budget" + * design that was a workaround for the Workers CPU cap — self-hosted Node has + * neither limit, so we just pull everything every tick. After the first + * bootstrap, deltas are small (per-ecosystem watermark filtering rejects most + * records before normalization). + */ +export async function runOsvDelta( + db: D1LikeDatabase, + cards: CardReader, + opts: { concurrency?: number } = {}, +): Promise { const osv = createOsvFetcher(); + const concurrency = Math.max(1, opts.concurrency ?? OSV_DEFAULT_CONCURRENCY); + const stateRow = await db .prepare(`SELECT last_modified FROM ingestion_state WHERE source = 'osv'`) .first<{ last_modified: string | null }>(); const cursor = parseCursor(stateRow?.last_modified ?? null); - const ecosystem = ROTATION[cursor.next_index % ROTATION.length]!; - const watermark = cursor.watermarks[ecosystem] ?? "1970-01-01T00:00:00Z"; - const tag = `osv:${ecosystem}`; + logIngest( + "osv:delta", + `▶ starting (${ROTATION.length} ecosystems, concurrency=${concurrency})`, + ); + const startedAt = Date.now(); - let processed = 0; - let lastReportedAt = 0; - const allChanged = new Map(); - let newWatermark = watermark; + const queue = [...ROTATION]; + const watermarks = { ...cursor.watermarks }; + const allAffected = new Map(); + let totalProcessed = 0; + const errors: string[] = []; + + const worker = async (): Promise => { + for (;;) { + const ecosystem = queue.shift(); + if (!ecosystem) return; + const initial = watermarks[ecosystem] ?? "1970-01-01T00:00:00Z"; + try { + const r = await processOsvEcosystem(db, osv, ecosystem, initial); + watermarks[ecosystem] = r.newWatermark; + totalProcessed += r.processed; + for (const k of r.affected) { + allAffected.set(`${k.ecosystem}::${k.package_name}`, k); + } + } catch (e) { + const msg = (e as Error).message; + errors.push(`${ecosystem}: ${msg}`); + logIngest(`osv:${ecosystem}`, `✗ ${msg}`); + } + } + }; + + await Promise.all(Array.from({ length: concurrency }, () => worker())); + + const affectedList = Array.from(allAffected.values()); + if (affectedList.length > 0) { + await publishCards(db, cards, affectedList, { concurrency: 16 }); + } + + const nextCursor: OsvCursor = { next_index: 0, watermarks }; + await recordIngestionState(db, "osv", errors.length ? "error" : "ok", totalProcessed, { + lastModified: JSON.stringify(nextCursor), + ...(errors.length ? { error: errors.slice(0, 3).join(" | ") } : {}), + }); + + logIngest( + "osv:delta", + `✓ done — ${totalProcessed} records across ${ROTATION.length} ecosystems in ${fmtElapsed(startedAt)}${errors.length ? ` (${errors.length} ecosystem errors)` : ""}`, + ); +} + +/** + * Bulk first-boot seed: stream OSV's `all.zip` (every ecosystem in one file) + * and upsert everything in a single pass. ~200 MB compressed download, but + * we process records as the zip streams so the resident set stays bounded + * by `OSV_FLUSH_AT`. Per-ecosystem watermarks get populated from each record's + * `affected[].package.ecosystem`, so the subsequent `runOsvDelta` ticks know + * exactly what's already loaded. + * + * Only called when OSV's `ingestion_state` row has never been recorded — + * subsequent runs use the much smaller per-ecosystem deltas. + */ +export async function runOsvBootstrap( + db: D1LikeDatabase, + cards: CardReader, +): Promise { + const osv = createOsvFetcher(); + const tag = "osv:bulk"; const startedAt = Date.now(); + logIngest(tag, "▶ starting (downloading OSV all.zip — every ecosystem)"); + let processed = 0; + let lastReportedAt = 0; + const watermarks: Record = {}; + const allAffected = new Map(); const buffer: ReturnType[] = []; + const flush = async (): Promise => { const batch = buffer.filter((b): b is NonNullable => b !== null); buffer.length = 0; @@ -168,99 +320,54 @@ export async function runOsvDelta(db: D1LikeDatabase, cards: CardReader): Promis const result = await upsertRecords(db, batch); processed += result.vulnerabilities_written; for (const k of result.affected_pairs_changed) { - allChanged.set(`${k.ecosystem}::${k.package_name}`, k); + allAffected.set(`${k.ecosystem}::${k.package_name}`, k); } - // Emit progress every ~5s so docker-logs viewers see a steady tick - // instead of one wall of text at the end. if (Date.now() - lastReportedAt >= 5000) { - const est = OSV_ECOSYSTEM_ESTIMATE[ecosystem] ?? OSV_ECOSYSTEM_ESTIMATE_DEFAULT; - logIngest(tag, `${bar(processed / est)} • ${processed} records • ${fmtElapsed(startedAt)}`); + // Bulk archive yields ~150K records; scale the bar against that. + logIngest(tag, `${bar(processed / 150_000)} • ${processed} records • ${fmtElapsed(startedAt)}`); lastReportedAt = Date.now(); } }; - logIngest( - tag, - `▶ starting (ecosystem ${(cursor.next_index % ROTATION.length) + 1}/${ROTATION.length})`, - ); - try { - const body = await osv.openEcosystemArchive(ecosystem); - // OSV stopped publishing this ecosystem (typically a distro after EOL). - // Skip it, advance the rotation, and record ok — otherwise the cron gets - // stuck retrying the same dead URL every 5 min forever and `last_ok_at` - // never advances, tripping status-page warnings. - if (body === null) { - const skipCursor: OsvCursor = { - next_index: (cursor.next_index + 1) % ROTATION.length, - watermarks: cursor.watermarks}; - await recordIngestionState(db, "osv", "ok", 0, { - lastModified: JSON.stringify(skipCursor), - error: `${ecosystem}: archive 404 (skipped)`}); - logIngest(tag, `· archive 404 — skipping, advancing rotation`); - return; - } - const { stopped } = await streamZipRecords(body, { - shouldStop: () => Date.now() - startedAt > OSV_RUN_BUDGET_MS, + const body = await osv.openAllArchive(); + await streamZipRecords(body, { onRecord: async ({ record }) => { - if (record.modified <= watermark) return; const norm = normalizeOsv(record); if (!norm) return; buffer.push(norm); - if (record.modified > newWatermark) newWatermark = record.modified; + for (const aff of record.affected ?? []) { + const eco = aff.package?.ecosystem; + if (!eco) continue; + const prev = watermarks[eco] ?? "1970-01-01T00:00:00Z"; + if (record.modified > prev) watermarks[eco] = record.modified; + } if (buffer.length >= OSV_FLUSH_AT) await flush(); - }}); + }, + }); await flush(); - // Cap card republishing per tick. Each card costs ~5 subrequests - // (3 D1 reads + 1 KV read + 1 KV write) and the worker has a - // 1000-subrequest hard cap per invocation. Beyond this many we'd - // bust the cap on the closing recordIngestionState write. Anything - // not republished here gets picked up by the next cron tick or the - // daily card recompute. - const CARDS_PER_TICK = 50; - const changedList = Array.from(allChanged.values()); - if (changedList.length > 0) { - await publishCards( - db, - cards, - changedList.slice(0, CARDS_PER_TICK), - { concurrency: 16 }, - ); + const affectedList = Array.from(allAffected.values()); + if (affectedList.length > 0) { + logIngest(tag, `→ publishing ${affectedList.length} card updates`); + await publishCards(db, cards, affectedList, { concurrency: 16 }); } - // Rotation policy: - // stopped + processed > 0 → mid-work, stay here next tick to finish - // stopped + processed == 0 → caught up (read the whole zip, rejected - // every record on the watermark), advance to avoid an infinite stall - // re-reading the same archive forever - // not stopped → finished naturally, advance - const advance = stopped && processed > 0 ? 0 : 1; - const nextCursor: OsvCursor = { - next_index: (cursor.next_index + advance) % ROTATION.length, - watermarks: { ...cursor.watermarks, [ecosystem]: newWatermark }}; + const nextCursor: OsvCursor = { next_index: 0, watermarks }; await recordIngestionState(db, "osv", "ok", processed, { - lastModified: JSON.stringify(nextCursor)}); - const tail = !stopped - ? "" - : processed > 0 - ? " (budget hit, resumes next tick)" - : " (caught up, advancing rotation)"; - logIngest(tag, `✓ done — ${processed} records in ${fmtElapsed(startedAt)}${tail}`); + lastModified: JSON.stringify(nextCursor), + }); + logIngest( + tag, + `✓ done — ${processed} records across ${Object.keys(watermarks).length} ecosystems in ${fmtElapsed(startedAt)}`, + ); } catch (e) { - // Try to flush whatever made it through before the failure so we don't - // re-process those records on the next run. - try { - await flush(); - } catch { - // ignore secondary failure - } - const persistedCursor: OsvCursor = { - next_index: cursor.next_index, - watermarks: { ...cursor.watermarks, [ecosystem]: newWatermark }}; + const msg = (e as Error).message; + try { await flush(); } catch { /* ignore */ } await recordIngestionState(db, "osv", "error", processed, { - error: `${ecosystem}: ${(e as Error).message}`, - lastModified: JSON.stringify(persistedCursor)}); + error: `bulk bootstrap: ${msg}`, + }); + logIngest(tag, `✗ failed in ${fmtElapsed(startedAt)}: ${msg}`); throw e; } } diff --git a/apps/server/src/ingest/scheduler.test.ts b/apps/server/src/ingest/scheduler.test.ts index 392376c..8b80965 100644 --- a/apps/server/src/ingest/scheduler.test.ts +++ b/apps/server/src/ingest/scheduler.test.ts @@ -19,6 +19,7 @@ function makeConfig(overrides: Partial = {}): Config { REFUSE_ENRICHMENT_CRON: "0 5 * * *", REFUSE_DISABLE_INGEST: true, // tests never want real cron tasks running REFUSE_BOOTSTRAP_ON_EMPTY: false, + REFUSE_OSV_CONCURRENCY: 4, REFUSE_CARD_CACHE_SIZE: 1000, REFUSE_CARD_CACHE_TTL_SECONDS: 60, ...overrides, diff --git a/apps/server/src/ingest/scheduler.ts b/apps/server/src/ingest/scheduler.ts index 9b98cc6..d741c86 100644 --- a/apps/server/src/ingest/scheduler.ts +++ b/apps/server/src/ingest/scheduler.ts @@ -21,7 +21,7 @@ import { LANGUAGE_ECOSYSTEMS, CI_ECOSYSTEMS } from "@refuse/shared"; import type { Config } from "../config"; import type { CardReader } from "../cards"; import type { D1LikeDatabase } from "../db/adapter"; -import { runOsvDelta, runDepsDevRefresh, runDailyEnrichment } from "./cron"; +import { runOsvDelta, runOsvBootstrap, runDepsDevRefresh, runDailyEnrichment } from "./cron"; interface JobDeps { db: D1LikeDatabase; @@ -113,7 +113,10 @@ export function buildScheduler( config: Config, deps: JobDeps, ): Scheduler { - const osvJob = makeJob("osv-delta", () => runOsvDelta(deps.db, deps.cards)); + const osvJob = makeJob("osv-delta", () => + runOsvDelta(deps.db, deps.cards, { concurrency: config.REFUSE_OSV_CONCURRENCY }), + ); + const osvBulkJob = makeJob("osv-bulk", () => runOsvBootstrap(deps.db, deps.cards)); const depsDevJob = makeJob("deps-dev", () => runDepsDevRefresh(deps.db, deps.cards)); const enrichJob = makeJob("enrichment", () => runDailyEnrichment(deps.db, deps.cards, deps.githubToken !== undefined ? { GITHUB_TOKEN: deps.githubToken } : {}), @@ -160,17 +163,16 @@ export function buildScheduler( // tick that fires later just no-ops. if (config.REFUSE_BOOTSTRAP_ON_EMPTY) { const done = readSourcesDone(rawDb); - const vulnRow = rawDb - .prepare(`SELECT COUNT(*) AS n FROM vulnerabilities`) - .get() as { n: number } | undefined; - const vulnsEmpty = !vulnRow || vulnRow.n === 0; const enrichmentSources = ["kev", "epss", "ghsa_direct", "wolfi"] as const; const missingEnrichment = enrichmentSources.filter((s) => !done.has(s)); const kicks: string[] = []; - if (vulnsEmpty || !done.has("osv")) { - kicks.push("osv"); - osvJob().catch(() => {}); + if (!done.has("osv")) { + // First boot: use the bulk all.zip — pulls every ecosystem in + // one streaming download. ~200 MB compressed, ~2-3 min total — + // ~50× faster than walking the 26-ecosystem rotation at 1/tick. + kicks.push("osv:bulk (all ecosystems in one pass)"); + osvBulkJob().catch(() => {}); } if (!done.has("deps_dev")) { kicks.push("deps-dev"); diff --git a/apps/server/src/ingest/sources/osv.ts b/apps/server/src/ingest/sources/osv.ts index 2e2c8a5..9412964 100644 --- a/apps/server/src/ingest/sources/osv.ts +++ b/apps/server/src/ingest/sources/osv.ts @@ -28,6 +28,13 @@ export interface OsvFetcher { openEcosystemArchive(ecosystem: string): Promise | null>; /** Fetch the full bulk archive — only call from the local backfill script. */ fetchAllArchive(): Promise; + /** + * Streaming version of the bulk archive — pair with `streamZipRecords` so + * we don't materialize the ~600 MB uncompressed payload in memory. Safe to + * use from any host that isn't memory-constrained (i.e. anything other + * than the Workers runtime). + */ + openAllArchive(): Promise>; } export interface OsvFetcherDeps { @@ -84,6 +91,16 @@ export function createOsvFetcher(deps: OsvFetcherDeps = {}): OsvFetcher { throw new Error(`OSV GCS fetch failed for all.zip: ${res.status}`); } return new Uint8Array(await res.arrayBuffer()); + }, + + async openAllArchive() { + const res = await f(`${OSV_GCS_BASE}/all.zip`); + if (!res.ok || !res.body) { + throw new Error( + `OSV GCS fetch failed for all.zip: ${res.status}${res.body ? "" : " (no body)"}`, + ); + } + return res.body; }}; }