Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/server/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const RECOGNISED_KEYS = new Set([
"REFUSE_DEPS_DEV_FREQUENCY",
"REFUSE_ENRICHMENT_CRON",
"REFUSE_BOOTSTRAP_ON_EMPTY",
"REFUSE_OSV_CONCURRENCY",
"REFUSE_LOG_LEVEL",
"REFUSE_GITHUB_TOKEN",
"REFUSE_DISABLE_INGEST",
Expand All @@ -42,6 +43,7 @@ const schema = z.object({
REFUSE_DEPS_DEV_FREQUENCY: positiveInt.default(15),
REFUSE_ENRICHMENT_CRON: z.string().default("0 5 * * *"),
REFUSE_BOOTSTRAP_ON_EMPTY: boolish.default("true"),
REFUSE_OSV_CONCURRENCY: positiveInt.default(4),
REFUSE_LOG_LEVEL: z.enum(["debug", "info", "warn", "error"]).default("info"),
REFUSE_GITHUB_TOKEN: z.string().optional(),
REFUSE_DISABLE_INGEST: boolish.default("false"),
Expand Down
283 changes: 195 additions & 88 deletions apps/server/src/ingest/cron.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,133 +134,240 @@ function parseCursor(raw: string | null): OsvCursor {

/** Flush every N normalized records into D1 to keep memory bounded. */
const OSV_FLUSH_AT = 80;
/** Default ecosystems fetched in parallel per delta tick. Overridable via REFUSE_OSV_CONCURRENCY. */
export const OSV_DEFAULT_CONCURRENCY = 4;

interface EcosystemPassResult {
ecosystem: string;
processed: number;
newWatermark: string;
affected: AffectedKey[];
skipped404: boolean;
error?: string;
}

/**
* Wall-clock budget per cron run. The npm zip is ~200 MB compressed and the
* delta from a cold watermark is tens of thousands of records — far more than
* one invocation can handle. We process a slice each run, persist the
* watermark, and rely on the rotation cursor to resume here next tick.
* Bounded to leave headroom under the Workers Paid CPU cap (5 min).
* Pull one ecosystem's per-ecosystem zip, apply watermark, upsert records, and
* return aggregated stats. No wall-clock budget — designed for a self-hosted
* Node runtime that doesn't need the old Worker CPU cap. The caller decides
* concurrency by spawning multiple invocations.
*/
const OSV_RUN_BUDGET_MS = 60 * 1000;
async function processOsvEcosystem(
db: D1LikeDatabase,
osv: ReturnType<typeof createOsvFetcher>,
ecosystem: string,
initialWatermark: string,
): Promise<EcosystemPassResult> {
const tag = `osv:${ecosystem}`;
const startedAt = Date.now();
let processed = 0;
let lastReportedAt = 0;
let newWatermark = initialWatermark;
const affected = new Map<string, AffectedKey>();
const buffer: ReturnType<typeof normalizeOsv>[] = [];
const flush = async (): Promise<void> => {
const batch = buffer.filter((b): b is NonNullable<typeof b> => b !== null);
buffer.length = 0;
if (batch.length === 0) return;
const result = await upsertRecords(db, batch);
processed += result.vulnerabilities_written;
for (const k of result.affected_pairs_changed) {
affected.set(`${k.ecosystem}::${k.package_name}`, k);
}
if (Date.now() - lastReportedAt >= 5000) {
const est = OSV_ECOSYSTEM_ESTIMATE[ecosystem] ?? OSV_ECOSYSTEM_ESTIMATE_DEFAULT;
logIngest(tag, `${bar(processed / est)} • ${processed} records • ${fmtElapsed(startedAt)}`);
lastReportedAt = Date.now();
}
};

const body = await osv.openEcosystemArchive(ecosystem);
if (body === null) {
logIngest(tag, `· archive 404 — ecosystem skipped`);
return {
ecosystem,
processed: 0,
newWatermark: initialWatermark,
affected: [],
skipped404: true,
};
}

export async function runOsvDelta(db: D1LikeDatabase, cards: CardReader): Promise<void> {
await streamZipRecords(body, {
onRecord: async ({ record }) => {
if (record.modified <= initialWatermark) return;
const norm = normalizeOsv(record);
if (!norm) return;
buffer.push(norm);
if (record.modified > newWatermark) newWatermark = record.modified;
if (buffer.length >= OSV_FLUSH_AT) await flush();
},
});
await flush();

logIngest(tag, `✓ done — ${processed} records in ${fmtElapsed(startedAt)}`);
return {
ecosystem,
processed,
newWatermark,
affected: Array.from(affected.values()),
skipped404: false,
};
}

/**
* Delta refresh across all ecosystems, run in parallel with a soft concurrency
* cap. Replaces the previous "1 ecosystem per 5-min tick under a 60s budget"
* design that was a workaround for the Workers CPU cap — self-hosted Node has
* neither limit, so we just pull everything every tick. After the first
* bootstrap, deltas are small (per-ecosystem watermark filtering rejects most
* records before normalization).
*/
export async function runOsvDelta(
db: D1LikeDatabase,
cards: CardReader,
opts: { concurrency?: number } = {},
): Promise<void> {
const osv = createOsvFetcher();
const concurrency = Math.max(1, opts.concurrency ?? OSV_DEFAULT_CONCURRENCY);

const stateRow = await db
.prepare(`SELECT last_modified FROM ingestion_state WHERE source = 'osv'`)
.first<{ last_modified: string | null }>();
const cursor = parseCursor(stateRow?.last_modified ?? null);

const ecosystem = ROTATION[cursor.next_index % ROTATION.length]!;
const watermark = cursor.watermarks[ecosystem] ?? "1970-01-01T00:00:00Z";
const tag = `osv:${ecosystem}`;
logIngest(
"osv:delta",
`▶ starting (${ROTATION.length} ecosystems, concurrency=${concurrency})`,
);
const startedAt = Date.now();

let processed = 0;
let lastReportedAt = 0;
const allChanged = new Map<string, AffectedKey>();
let newWatermark = watermark;
const queue = [...ROTATION];
const watermarks = { ...cursor.watermarks };
const allAffected = new Map<string, AffectedKey>();
let totalProcessed = 0;
const errors: string[] = [];

const worker = async (): Promise<void> => {
for (;;) {
const ecosystem = queue.shift();
if (!ecosystem) return;
const initial = watermarks[ecosystem] ?? "1970-01-01T00:00:00Z";
try {
const r = await processOsvEcosystem(db, osv, ecosystem, initial);
watermarks[ecosystem] = r.newWatermark;
totalProcessed += r.processed;
for (const k of r.affected) {
allAffected.set(`${k.ecosystem}::${k.package_name}`, k);
}
} catch (e) {
const msg = (e as Error).message;
errors.push(`${ecosystem}: ${msg}`);
logIngest(`osv:${ecosystem}`, `✗ ${msg}`);
}
}
};

await Promise.all(Array.from({ length: concurrency }, () => worker()));

const affectedList = Array.from(allAffected.values());
if (affectedList.length > 0) {
await publishCards(db, cards, affectedList, { concurrency: 16 });
}

const nextCursor: OsvCursor = { next_index: 0, watermarks };
await recordIngestionState(db, "osv", errors.length ? "error" : "ok", totalProcessed, {
lastModified: JSON.stringify(nextCursor),
...(errors.length ? { error: errors.slice(0, 3).join(" | ") } : {}),
});

logIngest(
"osv:delta",
`✓ done — ${totalProcessed} records across ${ROTATION.length} ecosystems in ${fmtElapsed(startedAt)}${errors.length ? ` (${errors.length} ecosystem errors)` : ""}`,
);
}

/**
* Bulk first-boot seed: stream OSV's `all.zip` (every ecosystem in one file)
* and upsert everything in a single pass. ~200 MB compressed download, but
* we process records as the zip streams so the resident set stays bounded
* by `OSV_FLUSH_AT`. Per-ecosystem watermarks get populated from each record's
* `affected[].package.ecosystem`, so the subsequent `runOsvDelta` ticks know
* exactly what's already loaded.
*
* Only called when OSV's `ingestion_state` row has never been recorded —
* subsequent runs use the much smaller per-ecosystem deltas.
*/
export async function runOsvBootstrap(
db: D1LikeDatabase,
cards: CardReader,
): Promise<void> {
const osv = createOsvFetcher();
const tag = "osv:bulk";
const startedAt = Date.now();
logIngest(tag, "▶ starting (downloading OSV all.zip — every ecosystem)");

let processed = 0;
let lastReportedAt = 0;
const watermarks: Record<string, string> = {};
const allAffected = new Map<string, AffectedKey>();
const buffer: ReturnType<typeof normalizeOsv>[] = [];

const flush = async (): Promise<void> => {
const batch = buffer.filter((b): b is NonNullable<typeof b> => b !== null);
buffer.length = 0;
if (batch.length === 0) return;
const result = await upsertRecords(db, batch);
processed += result.vulnerabilities_written;
for (const k of result.affected_pairs_changed) {
allChanged.set(`${k.ecosystem}::${k.package_name}`, k);
allAffected.set(`${k.ecosystem}::${k.package_name}`, k);
}
// Emit progress every ~5s so docker-logs viewers see a steady tick
// instead of one wall of text at the end.
if (Date.now() - lastReportedAt >= 5000) {
const est = OSV_ECOSYSTEM_ESTIMATE[ecosystem] ?? OSV_ECOSYSTEM_ESTIMATE_DEFAULT;
logIngest(tag, `${bar(processed / est)} • ${processed} records • ${fmtElapsed(startedAt)}`);
// Bulk archive yields ~150K records; scale the bar against that.
logIngest(tag, `${bar(processed / 150_000)} • ${processed} records • ${fmtElapsed(startedAt)}`);
lastReportedAt = Date.now();
}
};

logIngest(
tag,
`▶ starting (ecosystem ${(cursor.next_index % ROTATION.length) + 1}/${ROTATION.length})`,
);

try {
const body = await osv.openEcosystemArchive(ecosystem);
// OSV stopped publishing this ecosystem (typically a distro after EOL).
// Skip it, advance the rotation, and record ok — otherwise the cron gets
// stuck retrying the same dead URL every 5 min forever and `last_ok_at`
// never advances, tripping status-page warnings.
if (body === null) {
const skipCursor: OsvCursor = {
next_index: (cursor.next_index + 1) % ROTATION.length,
watermarks: cursor.watermarks};
await recordIngestionState(db, "osv", "ok", 0, {
lastModified: JSON.stringify(skipCursor),
error: `${ecosystem}: archive 404 (skipped)`});
logIngest(tag, `· archive 404 — skipping, advancing rotation`);
return;
}
const { stopped } = await streamZipRecords(body, {
shouldStop: () => Date.now() - startedAt > OSV_RUN_BUDGET_MS,
const body = await osv.openAllArchive();
await streamZipRecords(body, {
onRecord: async ({ record }) => {
if (record.modified <= watermark) return;
const norm = normalizeOsv(record);
if (!norm) return;
buffer.push(norm);
if (record.modified > newWatermark) newWatermark = record.modified;
for (const aff of record.affected ?? []) {
const eco = aff.package?.ecosystem;
if (!eco) continue;
const prev = watermarks[eco] ?? "1970-01-01T00:00:00Z";
if (record.modified > prev) watermarks[eco] = record.modified;
}
if (buffer.length >= OSV_FLUSH_AT) await flush();
}});
},
});
await flush();

// Cap card republishing per tick. Each card costs ~5 subrequests
// (3 D1 reads + 1 KV read + 1 KV write) and the worker has a
// 1000-subrequest hard cap per invocation. Beyond this many we'd
// bust the cap on the closing recordIngestionState write. Anything
// not republished here gets picked up by the next cron tick or the
// daily card recompute.
const CARDS_PER_TICK = 50;
const changedList = Array.from(allChanged.values());
if (changedList.length > 0) {
await publishCards(
db,
cards,
changedList.slice(0, CARDS_PER_TICK),
{ concurrency: 16 },
);
const affectedList = Array.from(allAffected.values());
if (affectedList.length > 0) {
logIngest(tag, `→ publishing ${affectedList.length} card updates`);
await publishCards(db, cards, affectedList, { concurrency: 16 });
}

// Rotation policy:
// stopped + processed > 0 → mid-work, stay here next tick to finish
// stopped + processed == 0 → caught up (read the whole zip, rejected
// every record on the watermark), advance to avoid an infinite stall
// re-reading the same archive forever
// not stopped → finished naturally, advance
const advance = stopped && processed > 0 ? 0 : 1;
const nextCursor: OsvCursor = {
next_index: (cursor.next_index + advance) % ROTATION.length,
watermarks: { ...cursor.watermarks, [ecosystem]: newWatermark }};
const nextCursor: OsvCursor = { next_index: 0, watermarks };
await recordIngestionState(db, "osv", "ok", processed, {
lastModified: JSON.stringify(nextCursor)});
const tail = !stopped
? ""
: processed > 0
? " (budget hit, resumes next tick)"
: " (caught up, advancing rotation)";
logIngest(tag, `✓ done — ${processed} records in ${fmtElapsed(startedAt)}${tail}`);
lastModified: JSON.stringify(nextCursor),
});
logIngest(
tag,
`✓ done — ${processed} records across ${Object.keys(watermarks).length} ecosystems in ${fmtElapsed(startedAt)}`,
);
} catch (e) {
// Try to flush whatever made it through before the failure so we don't
// re-process those records on the next run.
try {
await flush();
} catch {
// ignore secondary failure
}
const persistedCursor: OsvCursor = {
next_index: cursor.next_index,
watermarks: { ...cursor.watermarks, [ecosystem]: newWatermark }};
const msg = (e as Error).message;
try { await flush(); } catch { /* ignore */ }
await recordIngestionState(db, "osv", "error", processed, {
error: `${ecosystem}: ${(e as Error).message}`,
lastModified: JSON.stringify(persistedCursor)});
error: `bulk bootstrap: ${msg}`,
});
logIngest(tag, `✗ failed in ${fmtElapsed(startedAt)}: ${msg}`);
throw e;
}
}
Expand Down
1 change: 1 addition & 0 deletions apps/server/src/ingest/scheduler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ function makeConfig(overrides: Partial<Config> = {}): Config {
REFUSE_ENRICHMENT_CRON: "0 5 * * *",
REFUSE_DISABLE_INGEST: true, // tests never want real cron tasks running
REFUSE_BOOTSTRAP_ON_EMPTY: false,
REFUSE_OSV_CONCURRENCY: 4,
REFUSE_CARD_CACHE_SIZE: 1000,
REFUSE_CARD_CACHE_TTL_SECONDS: 60,
...overrides,
Expand Down
20 changes: 11 additions & 9 deletions apps/server/src/ingest/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { LANGUAGE_ECOSYSTEMS, CI_ECOSYSTEMS } from "@refuse/shared";
import type { Config } from "../config";
import type { CardReader } from "../cards";
import type { D1LikeDatabase } from "../db/adapter";
import { runOsvDelta, runDepsDevRefresh, runDailyEnrichment } from "./cron";
import { runOsvDelta, runOsvBootstrap, runDepsDevRefresh, runDailyEnrichment } from "./cron";

interface JobDeps {
db: D1LikeDatabase;
Expand Down Expand Up @@ -113,7 +113,10 @@ export function buildScheduler(
config: Config,
deps: JobDeps,
): Scheduler {
const osvJob = makeJob("osv-delta", () => runOsvDelta(deps.db, deps.cards));
const osvJob = makeJob("osv-delta", () =>
runOsvDelta(deps.db, deps.cards, { concurrency: config.REFUSE_OSV_CONCURRENCY }),
);
const osvBulkJob = makeJob("osv-bulk", () => runOsvBootstrap(deps.db, deps.cards));
const depsDevJob = makeJob("deps-dev", () => runDepsDevRefresh(deps.db, deps.cards));
const enrichJob = makeJob("enrichment", () =>
runDailyEnrichment(deps.db, deps.cards, deps.githubToken !== undefined ? { GITHUB_TOKEN: deps.githubToken } : {}),
Expand Down Expand Up @@ -160,17 +163,16 @@ export function buildScheduler(
// tick that fires later just no-ops.
if (config.REFUSE_BOOTSTRAP_ON_EMPTY) {
const done = readSourcesDone(rawDb);
const vulnRow = rawDb
.prepare(`SELECT COUNT(*) AS n FROM vulnerabilities`)
.get() as { n: number } | undefined;
const vulnsEmpty = !vulnRow || vulnRow.n === 0;
const enrichmentSources = ["kev", "epss", "ghsa_direct", "wolfi"] as const;
const missingEnrichment = enrichmentSources.filter((s) => !done.has(s));

const kicks: string[] = [];
if (vulnsEmpty || !done.has("osv")) {
kicks.push("osv");
osvJob().catch(() => {});
if (!done.has("osv")) {
// First boot: use the bulk all.zip — pulls every ecosystem in
// one streaming download. ~200 MB compressed, ~2-3 min total —
// ~50× faster than walking the 26-ecosystem rotation at 1/tick.
kicks.push("osv:bulk (all ecosystems in one pass)");
osvBulkJob().catch(() => {});
}
if (!done.has("deps_dev")) {
kicks.push("deps-dev");
Expand Down
Loading