diff --git a/apps/server/src/ingest/cron.ts b/apps/server/src/ingest/cron.ts index 0fdd718..54c18b7 100644 --- a/apps/server/src/ingest/cron.ts +++ b/apps/server/src/ingest/cron.ts @@ -22,7 +22,68 @@ import type { CardReader } from "../cards"; * Each run fetches one ecosystem's archive, processes records whose * `modified` timestamp is newer than the per-ecosystem watermark, and updates * the watermark. + * + * Each ingest step emits structured progress lines so `docker logs` shows a + * "loading bar" of what's actually happening per source: + * + * refuse: ingest[osv:npm] ▶ starting (ecosystem 1/28) + * refuse: ingest[osv:npm] 5000 records • 12s + * refuse: ingest[osv:npm] ✓ done — 14502 records in 41s + */ + +/** Consistent prefix for ingest progress logs — greppable + alignable. */ +function logIngest(tag: string, msg: string): void { + // Pad tag to a fixed width so output columns stay aligned across sources. + // 18 chars covers "osv:Rocky Linux:9" with room for one more char of slack. + console.log(`refuse: ingest[${tag.padEnd(18)}] ${msg}`); +} + +/** Format an elapsed-ms count as a short string (1.2s / 14s / 1m23s). */ +function fmtElapsed(startedAt: number): string { + const ms = Date.now() - startedAt; + if (ms < 1000) return `${ms}ms`; + if (ms < 60_000) return `${(ms / 1000).toFixed(ms < 10_000 ? 1 : 0)}s`; + const m = Math.floor(ms / 60_000); + const s = Math.round((ms - m * 60_000) / 1000); + return `${m}m${s}s`; +} + +/** + * Render a 20-char ASCII progress bar. Pct ∈ [0, 1] is clamped — streaming + * sources pass an estimated pct that may "stick" near 99% until the source + * completes; the authoritative final count is shown in the trailing `done` + * line so the bar is decorative-but-honest rather than misleading. + */ +function bar(pct: number, width = 20): string { + const clamped = Math.max(0, Math.min(1, pct)); + const filled = Math.round(clamped * width); + return `[${"█".repeat(filled)}${"░".repeat(width - filled)}] ${Math.floor(clamped * 100) + .toString() + .padStart(3)}%`; +} + +/** + * Calibrated soft-caps for streaming sources. We don't know how many records + * a given OSV ecosystem zip will yield until we finish reading it, so we pick + * a per-ecosystem ceiling that npm-class ecosystems comfortably saturate and + * smaller ecosystems mark "fast-and-done" against. The bar holds at ~99% if + * we overshoot the estimate — the `done` line tells the truth. */ +const OSV_ECOSYSTEM_ESTIMATE: Record = { + npm: 30_000, + PyPI: 20_000, + Maven: 15_000, + Go: 8_000, + "crates.io": 4_000, + RubyGems: 3_000, + NuGet: 4_000, + Packagist: 2_000, + Hex: 500, + Pub: 200, + "GitHub Actions": 500, +}; +const OSV_ECOSYSTEM_ESTIMATE_DEFAULT = 5_000; // distros + anything unseen +const EPSS_ESTIMATE = 280_000; // Versioned distro identifiers OSV publishes per-ecosystem zips for. These // expand check_dockerfile coverage to cover apt/apk/dnf packages on the @@ -91,8 +152,10 @@ export async function runOsvDelta(db: D1LikeDatabase, cards: CardReader): Promis const ecosystem = ROTATION[cursor.next_index % ROTATION.length]!; const watermark = cursor.watermarks[ecosystem] ?? "1970-01-01T00:00:00Z"; + const tag = `osv:${ecosystem}`; let processed = 0; + let lastReportedAt = 0; const allChanged = new Map(); let newWatermark = watermark; const startedAt = Date.now(); @@ -107,8 +170,20 @@ export async function runOsvDelta(db: D1LikeDatabase, cards: CardReader): Promis for (const k of result.affected_pairs_changed) { allChanged.set(`${k.ecosystem}::${k.package_name}`, k); } + // Emit progress every ~5s so docker-logs viewers see a steady tick + // instead of one wall of text at the end. + if (Date.now() - lastReportedAt >= 5000) { + const est = OSV_ECOSYSTEM_ESTIMATE[ecosystem] ?? OSV_ECOSYSTEM_ESTIMATE_DEFAULT; + logIngest(tag, `${bar(processed / est)} • ${processed} records • ${fmtElapsed(startedAt)}`); + lastReportedAt = Date.now(); + } }; + logIngest( + tag, + `▶ starting (ecosystem ${(cursor.next_index % ROTATION.length) + 1}/${ROTATION.length})`, + ); + try { const body = await osv.openEcosystemArchive(ecosystem); // OSV stopped publishing this ecosystem (typically a distro after EOL). @@ -122,6 +197,7 @@ export async function runOsvDelta(db: D1LikeDatabase, cards: CardReader): Promis await recordIngestionState(db, "osv", "ok", 0, { lastModified: JSON.stringify(skipCursor), error: `${ecosystem}: archive 404 (skipped)`}); + logIngest(tag, `· archive 404 — skipping, advancing rotation`); return; } const { stopped } = await streamZipRecords(body, { @@ -161,6 +237,10 @@ export async function runOsvDelta(db: D1LikeDatabase, cards: CardReader): Promis watermarks: { ...cursor.watermarks, [ecosystem]: newWatermark }}; await recordIngestionState(db, "osv", "ok", processed, { lastModified: JSON.stringify(nextCursor)}); + logIngest( + tag, + `✓ done — ${processed} records in ${fmtElapsed(startedAt)}${stopped ? " (budget hit, resumes next tick)" : ""}`, + ); } catch (e) { // Try to flush whatever made it through before the failure so we don't // re-process those records on the next run. @@ -212,12 +292,25 @@ export async function runDepsDevRefresh(db: D1LikeDatabase, cards: CardReader): .all<{ ecosystem: string; package_name: string }>(); const targets = packagesRes.results ?? []; + const depsT0 = Date.now(); + logIngest("deps-dev", `▶ starting (${targets.length} packages in this batch)`); + let processed = 0; + let lastDepsReportedAt = 0; + let pkgIndex = 0; const changed: AffectedKey[] = []; const failures: string[] = []; const FAILURE_LIMIT = 50; for (const { ecosystem, package_name } of targets) { + pkgIndex++; + if (Date.now() - lastDepsReportedAt >= 5000) { + logIngest( + "deps-dev", + `${bar(pkgIndex / Math.max(1, targets.length))} • ${pkgIndex}/${targets.length} packages • ${fmtElapsed(depsT0)}`, + ); + lastDepsReportedAt = Date.now(); + } try { const pkg = await fetcher.getPackageVersions(ecosystem, package_name); if (!pkg) continue; @@ -310,6 +403,10 @@ export async function runDepsDevRefresh(db: D1LikeDatabase, cards: CardReader): errorOpt.error = `${failures.length} per-package failures (this batch): ${failures.slice(0, 3).join("; ")}`; } await recordIngestionState(db, "deps_dev", "ok", processed, errorOpt); + logIngest( + "deps-dev", + `✓ done — ${processed} version rows across ${targets.length} packages in ${fmtElapsed(depsT0)}${failures.length ? ` (${failures.length} per-package failures)` : ""}`, + ); } /** @@ -324,69 +421,94 @@ export async function runDailyEnrichment( ): Promise { const errors: string[] = []; - try { - const r = await refreshKev(db); - await recordIngestionState(db, "kev", "ok", r.upserted); - console.log(`KEV: fetched=${r.fetched} upserted=${r.upserted}`); - } catch (e) { - const msg = (e as Error).message; - errors.push(`KEV: ${msg}`); - await recordIngestionState(db, "kev", "error", 0, { error: msg }).catch(() => {}); + { + const t0 = Date.now(); + logIngest("kev", "▶ starting"); + try { + const r = await refreshKev(db); + await recordIngestionState(db, "kev", "ok", r.upserted); + logIngest("kev", `${bar(1)} • ${r.upserted}/${r.fetched} entries`); + logIngest("kev", `✓ done — ${r.upserted} entries in ${fmtElapsed(t0)}`); + } catch (e) { + const msg = (e as Error).message; + errors.push(`KEV: ${msg}`); + await recordIngestionState(db, "kev", "error", 0, { error: msg }).catch(() => {}); + logIngest("kev", `✗ failed in ${fmtElapsed(t0)}: ${msg}`); + } } - try { - const r = await refreshEpss(db); - await recordIngestionState(db, "epss", "ok", r.upserted, { - ...(r.scored_date ? { lastModified: r.scored_date } : {}), - }); - console.log(`EPSS: fetched=${r.fetched} upserted=${r.upserted} date=${r.scored_date}`); - } catch (e) { - const msg = (e as Error).message; - errors.push(`EPSS: ${msg}`); - await recordIngestionState(db, "epss", "error", 0, { error: msg }).catch(() => {}); + { + const t0 = Date.now(); + logIngest("epss", "▶ starting"); + try { + const r = await refreshEpss(db); + await recordIngestionState(db, "epss", "ok", r.upserted, { + ...(r.scored_date ? { lastModified: r.scored_date } : {}), + }); + logIngest("epss", `${bar(r.upserted / EPSS_ESTIMATE)} • ${r.upserted} rows scored ${r.scored_date ?? ""}`); + logIngest("epss", `✓ done — ${r.upserted} rows in ${fmtElapsed(t0)}`); + } catch (e) { + const msg = (e as Error).message; + errors.push(`EPSS: ${msg}`); + await recordIngestionState(db, "epss", "error", 0, { error: msg }).catch(() => {}); + logIngest("epss", `✗ failed in ${fmtElapsed(t0)}: ${msg}`); + } } - try { - const stateRow = await db - .prepare(`SELECT last_modified FROM ingestion_state WHERE source = 'ghsa_direct'`) - .first<{ last_modified: string | null }>(); - const cursor = stateRow?.last_modified ?? undefined; - const { records, cursor: nextCursor } = await pullGhsaPage( - options.GITHUB_TOKEN, - 100, - cursor, - ); - if (records.length > 0) { - const normalized = records - .map((r) => normalizeOsv(r)) - .filter((n): n is NonNullable => n !== null); - const result = await upsertRecords(db, normalized); - await publishCards(db, cards, result.affected_pairs_changed, { - concurrency: 16}); + { + const t0 = Date.now(); + try { + const stateRow = await db + .prepare(`SELECT last_modified FROM ingestion_state WHERE source = 'ghsa_direct'`) + .first<{ last_modified: string | null }>(); + const cursor = stateRow?.last_modified ?? undefined; + logIngest("ghsa", `▶ starting (${cursor ? "resuming from saved cursor" : "no prior cursor"})`); + const { records, cursor: nextCursor } = await pullGhsaPage( + options.GITHUB_TOKEN, + 100, + cursor, + ); + if (records.length > 0) { + const normalized = records + .map((r) => normalizeOsv(r)) + .filter((n): n is NonNullable => n !== null); + const result = await upsertRecords(db, normalized); + await publishCards(db, cards, result.affected_pairs_changed, { + concurrency: 16}); + } + await recordIngestionState(db, "ghsa_direct", "ok", records.length, { + ...(nextCursor ? { lastModified: nextCursor } : {})}); + logIngest("ghsa", `${bar(records.length / 100)} • ${records.length}/100 records this page`); + logIngest("ghsa", `✓ done — ${records.length} records in ${fmtElapsed(t0)}${nextCursor ? " (cursor saved)" : " (caught up)"}`); + } catch (e) { + const msg = (e as Error).message; + errors.push(`GHSA direct: ${msg}`); + logIngest("ghsa", `✗ failed in ${fmtElapsed(t0)}: ${msg}`); } - await recordIngestionState(db, "ghsa_direct", "ok", records.length, { - ...(nextCursor ? { lastModified: nextCursor } : {})}); - console.log(`GHSA direct: imported=${records.length}`); - } catch (e) { - errors.push(`GHSA direct: ${(e as Error).message}`); } - try { - const { records, stats } = await pullWolfi(); - if (records.length > 0) { - const normalized = records - .map((r) => normalizeOsv(r)) - .filter((n): n is NonNullable => n !== null); - const result = await upsertRecords(db, normalized); - await publishCards(db, cards, result.affected_pairs_changed, { - concurrency: 16}); + { + const t0 = Date.now(); + logIngest("wolfi", "▶ starting"); + try { + const { records, stats } = await pullWolfi(); + if (records.length > 0) { + const normalized = records + .map((r) => normalizeOsv(r)) + .filter((n): n is NonNullable => n !== null); + const result = await upsertRecords(db, normalized); + await publishCards(db, cards, result.affected_pairs_changed, { + concurrency: 16}); + } + await recordIngestionState(db, "wolfi", "ok", records.length); + logIngest("wolfi", `${bar(1)} • ${stats.packages} packages, ${stats.records} records`); + logIngest("wolfi", `✓ done — ${stats.records} records across ${stats.packages} packages in ${fmtElapsed(t0)}`); + } catch (e) { + const msg = (e as Error).message; + errors.push(`Wolfi: ${msg}`); + await recordIngestionState(db, "wolfi", "error", 0, { error: msg }).catch(() => {}); + logIngest("wolfi", `✗ failed in ${fmtElapsed(t0)}: ${msg}`); } - await recordIngestionState(db, "wolfi", "ok", records.length); - console.log(`Wolfi: packages=${stats.packages} records=${stats.records}`); - } catch (e) { - const msg = (e as Error).message; - errors.push(`Wolfi: ${msg}`); - await recordIngestionState(db, "wolfi", "error", 0, { error: msg }).catch(() => {}); } await recordIngestionState(