Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 177 additions & 55 deletions apps/server/src/ingest/cron.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,68 @@ import type { CardReader } from "../cards";
* Each run fetches one ecosystem's archive, processes records whose
* `modified` timestamp is newer than the per-ecosystem watermark, and updates
* the watermark.
*
* Each ingest step emits structured progress lines so `docker logs` shows a
* "loading bar" of what's actually happening per source:
*
* refuse: ingest[osv:npm] ▶ starting (ecosystem 1/28)
* refuse: ingest[osv:npm] 5000 records • 12s
* refuse: ingest[osv:npm] ✓ done — 14502 records in 41s
*/

/** Consistent prefix for ingest progress logs — greppable + alignable. */
function logIngest(tag: string, msg: string): void {
// Pad tag to a fixed width so output columns stay aligned across sources.
// 18 chars covers "osv:Rocky Linux:9" with room for one more char of slack.
console.log(`refuse: ingest[${tag.padEnd(18)}] ${msg}`);
}

/** Format an elapsed-ms count as a short string (1.2s / 14s / 1m23s). */
function fmtElapsed(startedAt: number): string {
const ms = Date.now() - startedAt;
if (ms < 1000) return `${ms}ms`;
if (ms < 60_000) return `${(ms / 1000).toFixed(ms < 10_000 ? 1 : 0)}s`;
const m = Math.floor(ms / 60_000);
const s = Math.round((ms - m * 60_000) / 1000);
return `${m}m${s}s`;
}

/**
* Render a 20-char ASCII progress bar. Pct ∈ [0, 1] is clamped — streaming
* sources pass an estimated pct that may "stick" near 99% until the source
* completes; the authoritative final count is shown in the trailing `done`
* line so the bar is decorative-but-honest rather than misleading.
*/
function bar(pct: number, width = 20): string {
const clamped = Math.max(0, Math.min(1, pct));
const filled = Math.round(clamped * width);
return `[${"█".repeat(filled)}${"░".repeat(width - filled)}] ${Math.floor(clamped * 100)
.toString()
.padStart(3)}%`;
}

/**
* Calibrated soft-caps for streaming sources. We don't know how many records
* a given OSV ecosystem zip will yield until we finish reading it, so we pick
* a per-ecosystem ceiling that npm-class ecosystems comfortably saturate and
* smaller ecosystems mark "fast-and-done" against. The bar holds at ~99% if
* we overshoot the estimate — the `done` line tells the truth.
*/
const OSV_ECOSYSTEM_ESTIMATE: Record<string, number> = {
npm: 30_000,
PyPI: 20_000,
Maven: 15_000,
Go: 8_000,
"crates.io": 4_000,
RubyGems: 3_000,
NuGet: 4_000,
Packagist: 2_000,
Hex: 500,
Pub: 200,
"GitHub Actions": 500,
};
const OSV_ECOSYSTEM_ESTIMATE_DEFAULT = 5_000; // distros + anything unseen
const EPSS_ESTIMATE = 280_000;

// Versioned distro identifiers OSV publishes per-ecosystem zips for. These
// expand check_dockerfile coverage to cover apt/apk/dnf packages on the
Expand Down Expand Up @@ -91,8 +152,10 @@ export async function runOsvDelta(db: D1LikeDatabase, cards: CardReader): Promis

const ecosystem = ROTATION[cursor.next_index % ROTATION.length]!;
const watermark = cursor.watermarks[ecosystem] ?? "1970-01-01T00:00:00Z";
const tag = `osv:${ecosystem}`;

let processed = 0;
let lastReportedAt = 0;
const allChanged = new Map<string, AffectedKey>();
let newWatermark = watermark;
const startedAt = Date.now();
Expand All @@ -107,8 +170,20 @@ export async function runOsvDelta(db: D1LikeDatabase, cards: CardReader): Promis
for (const k of result.affected_pairs_changed) {
allChanged.set(`${k.ecosystem}::${k.package_name}`, k);
}
// Emit progress every ~5s so docker-logs viewers see a steady tick
// instead of one wall of text at the end.
if (Date.now() - lastReportedAt >= 5000) {
const est = OSV_ECOSYSTEM_ESTIMATE[ecosystem] ?? OSV_ECOSYSTEM_ESTIMATE_DEFAULT;
logIngest(tag, `${bar(processed / est)} • ${processed} records • ${fmtElapsed(startedAt)}`);
lastReportedAt = Date.now();
}
};

logIngest(
tag,
`▶ starting (ecosystem ${(cursor.next_index % ROTATION.length) + 1}/${ROTATION.length})`,
);

try {
const body = await osv.openEcosystemArchive(ecosystem);
// OSV stopped publishing this ecosystem (typically a distro after EOL).
Expand All @@ -122,6 +197,7 @@ export async function runOsvDelta(db: D1LikeDatabase, cards: CardReader): Promis
await recordIngestionState(db, "osv", "ok", 0, {
lastModified: JSON.stringify(skipCursor),
error: `${ecosystem}: archive 404 (skipped)`});
logIngest(tag, `· archive 404 — skipping, advancing rotation`);
return;
}
const { stopped } = await streamZipRecords(body, {
Expand Down Expand Up @@ -161,6 +237,10 @@ export async function runOsvDelta(db: D1LikeDatabase, cards: CardReader): Promis
watermarks: { ...cursor.watermarks, [ecosystem]: newWatermark }};
await recordIngestionState(db, "osv", "ok", processed, {
lastModified: JSON.stringify(nextCursor)});
logIngest(
tag,
`✓ done — ${processed} records in ${fmtElapsed(startedAt)}${stopped ? " (budget hit, resumes next tick)" : ""}`,
);
} catch (e) {
// Try to flush whatever made it through before the failure so we don't
// re-process those records on the next run.
Expand Down Expand Up @@ -212,12 +292,25 @@ export async function runDepsDevRefresh(db: D1LikeDatabase, cards: CardReader):
.all<{ ecosystem: string; package_name: string }>();
const targets = packagesRes.results ?? [];

const depsT0 = Date.now();
logIngest("deps-dev", `▶ starting (${targets.length} packages in this batch)`);

let processed = 0;
let lastDepsReportedAt = 0;
let pkgIndex = 0;
const changed: AffectedKey[] = [];
const failures: string[] = [];
const FAILURE_LIMIT = 50;

for (const { ecosystem, package_name } of targets) {
pkgIndex++;
if (Date.now() - lastDepsReportedAt >= 5000) {
logIngest(
"deps-dev",
`${bar(pkgIndex / Math.max(1, targets.length))} • ${pkgIndex}/${targets.length} packages • ${fmtElapsed(depsT0)}`,
);
lastDepsReportedAt = Date.now();
}
try {
const pkg = await fetcher.getPackageVersions(ecosystem, package_name);
if (!pkg) continue;
Expand Down Expand Up @@ -310,6 +403,10 @@ export async function runDepsDevRefresh(db: D1LikeDatabase, cards: CardReader):
errorOpt.error = `${failures.length} per-package failures (this batch): ${failures.slice(0, 3).join("; ")}`;
}
await recordIngestionState(db, "deps_dev", "ok", processed, errorOpt);
logIngest(
"deps-dev",
`✓ done — ${processed} version rows across ${targets.length} packages in ${fmtElapsed(depsT0)}${failures.length ? ` (${failures.length} per-package failures)` : ""}`,
);
}

/**
Expand All @@ -324,69 +421,94 @@ export async function runDailyEnrichment(
): Promise<void> {
const errors: string[] = [];

try {
const r = await refreshKev(db);
await recordIngestionState(db, "kev", "ok", r.upserted);
console.log(`KEV: fetched=${r.fetched} upserted=${r.upserted}`);
} catch (e) {
const msg = (e as Error).message;
errors.push(`KEV: ${msg}`);
await recordIngestionState(db, "kev", "error", 0, { error: msg }).catch(() => {});
{
const t0 = Date.now();
logIngest("kev", "▶ starting");
try {
const r = await refreshKev(db);
await recordIngestionState(db, "kev", "ok", r.upserted);
logIngest("kev", `${bar(1)} • ${r.upserted}/${r.fetched} entries`);
logIngest("kev", `✓ done — ${r.upserted} entries in ${fmtElapsed(t0)}`);
} catch (e) {
const msg = (e as Error).message;
errors.push(`KEV: ${msg}`);
await recordIngestionState(db, "kev", "error", 0, { error: msg }).catch(() => {});
logIngest("kev", `✗ failed in ${fmtElapsed(t0)}: ${msg}`);
}
}

try {
const r = await refreshEpss(db);
await recordIngestionState(db, "epss", "ok", r.upserted, {
...(r.scored_date ? { lastModified: r.scored_date } : {}),
});
console.log(`EPSS: fetched=${r.fetched} upserted=${r.upserted} date=${r.scored_date}`);
} catch (e) {
const msg = (e as Error).message;
errors.push(`EPSS: ${msg}`);
await recordIngestionState(db, "epss", "error", 0, { error: msg }).catch(() => {});
{
const t0 = Date.now();
logIngest("epss", "▶ starting");
try {
const r = await refreshEpss(db);
await recordIngestionState(db, "epss", "ok", r.upserted, {
...(r.scored_date ? { lastModified: r.scored_date } : {}),
});
logIngest("epss", `${bar(r.upserted / EPSS_ESTIMATE)} • ${r.upserted} rows scored ${r.scored_date ?? ""}`);
logIngest("epss", `✓ done — ${r.upserted} rows in ${fmtElapsed(t0)}`);
} catch (e) {
const msg = (e as Error).message;
errors.push(`EPSS: ${msg}`);
await recordIngestionState(db, "epss", "error", 0, { error: msg }).catch(() => {});
logIngest("epss", `✗ failed in ${fmtElapsed(t0)}: ${msg}`);
}
}

try {
const stateRow = await db
.prepare(`SELECT last_modified FROM ingestion_state WHERE source = 'ghsa_direct'`)
.first<{ last_modified: string | null }>();
const cursor = stateRow?.last_modified ?? undefined;
const { records, cursor: nextCursor } = await pullGhsaPage(
options.GITHUB_TOKEN,
100,
cursor,
);
if (records.length > 0) {
const normalized = records
.map((r) => normalizeOsv(r))
.filter((n): n is NonNullable<typeof n> => n !== null);
const result = await upsertRecords(db, normalized);
await publishCards(db, cards, result.affected_pairs_changed, {
concurrency: 16});
{
const t0 = Date.now();
try {
const stateRow = await db
.prepare(`SELECT last_modified FROM ingestion_state WHERE source = 'ghsa_direct'`)
.first<{ last_modified: string | null }>();
const cursor = stateRow?.last_modified ?? undefined;
logIngest("ghsa", `▶ starting (${cursor ? "resuming from saved cursor" : "no prior cursor"})`);
const { records, cursor: nextCursor } = await pullGhsaPage(
options.GITHUB_TOKEN,
100,
cursor,
);
if (records.length > 0) {
const normalized = records
.map((r) => normalizeOsv(r))
.filter((n): n is NonNullable<typeof n> => n !== null);
const result = await upsertRecords(db, normalized);
await publishCards(db, cards, result.affected_pairs_changed, {
concurrency: 16});
}
await recordIngestionState(db, "ghsa_direct", "ok", records.length, {
...(nextCursor ? { lastModified: nextCursor } : {})});
logIngest("ghsa", `${bar(records.length / 100)} • ${records.length}/100 records this page`);
logIngest("ghsa", `✓ done — ${records.length} records in ${fmtElapsed(t0)}${nextCursor ? " (cursor saved)" : " (caught up)"}`);
} catch (e) {
const msg = (e as Error).message;
errors.push(`GHSA direct: ${msg}`);
logIngest("ghsa", `✗ failed in ${fmtElapsed(t0)}: ${msg}`);
}
await recordIngestionState(db, "ghsa_direct", "ok", records.length, {
...(nextCursor ? { lastModified: nextCursor } : {})});
console.log(`GHSA direct: imported=${records.length}`);
} catch (e) {
errors.push(`GHSA direct: ${(e as Error).message}`);
}

try {
const { records, stats } = await pullWolfi();
if (records.length > 0) {
const normalized = records
.map((r) => normalizeOsv(r))
.filter((n): n is NonNullable<typeof n> => n !== null);
const result = await upsertRecords(db, normalized);
await publishCards(db, cards, result.affected_pairs_changed, {
concurrency: 16});
{
const t0 = Date.now();
logIngest("wolfi", "▶ starting");
try {
const { records, stats } = await pullWolfi();
if (records.length > 0) {
const normalized = records
.map((r) => normalizeOsv(r))
.filter((n): n is NonNullable<typeof n> => n !== null);
const result = await upsertRecords(db, normalized);
await publishCards(db, cards, result.affected_pairs_changed, {
concurrency: 16});
}
await recordIngestionState(db, "wolfi", "ok", records.length);
logIngest("wolfi", `${bar(1)} • ${stats.packages} packages, ${stats.records} records`);
logIngest("wolfi", `✓ done — ${stats.records} records across ${stats.packages} packages in ${fmtElapsed(t0)}`);
} catch (e) {
const msg = (e as Error).message;
errors.push(`Wolfi: ${msg}`);
await recordIngestionState(db, "wolfi", "error", 0, { error: msg }).catch(() => {});
logIngest("wolfi", `✗ failed in ${fmtElapsed(t0)}: ${msg}`);
}
await recordIngestionState(db, "wolfi", "ok", records.length);
console.log(`Wolfi: packages=${stats.packages} records=${stats.records}`);
} catch (e) {
const msg = (e as Error).message;
errors.push(`Wolfi: ${msg}`);
await recordIngestionState(db, "wolfi", "error", 0, { error: msg }).catch(() => {});
}

await recordIngestionState(
Expand Down