From f4e8e65ae373db0a8163b3614519c6776d70517e Mon Sep 17 00:00:00 2001 From: gok03 Date: Wed, 10 Jun 2026 13:07:53 +0530 Subject: [PATCH] feat(ingest): visual progress bars in docker logs for every source MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `docker logs -f refuse` previously gave the user nothing to read between "cron started" and the first "osv-delta ok in 60103 ms". Five sources plus deps.dev and they had no idea which were running, which were done, or how far through they were. Add structured progress lines per source with a 20-char ASCII bar: refuse: ingest[osv:npm ] ▶ starting (ecosystem 1/28) refuse: ingest[osv:npm ] [██████░░░░░░░░░░░░░░] 30% • 9000 records • 12s refuse: ingest[osv:npm ] [████████████████░░░░] 80% • 24000 records • 35s refuse: ingest[osv:npm ] ✓ done — 30142 records in 41s refuse: ingest[kev ] ▶ starting refuse: ingest[kev ] [████████████████████] 100% • 1542/1542 entries refuse: ingest[kev ] ✓ done — 1542 entries in 1.8s refuse: ingest[epss ] ▶ starting refuse: ingest[epss ] [████████████████████] 87% • 245678 rows scored 2026-06-09 refuse: ingest[epss ] ✓ done — 245678 rows in 14s Streaming sources (OSV per-ecosystem, EPSS) don't know their total until they finish, so the bar fills against a calibrated per-ecosystem estimate (npm: 30K, PyPI: 20K, Maven: 15K, …, distros: 5K default; EPSS: 280K). The bar holds near the cap if we overshoot — the trailing `done` line states the actual count. Known-total sources (KEV, GHSA page-of-100, Wolfi) show real percentages. Tag column is fixed-width so the bars line up; ticks every 5 s so the output streams instead of arriving in one wall at the end. --- apps/server/src/ingest/cron.ts | 232 +++++++++++++++++++++++++-------- 1 file changed, 177 insertions(+), 55 deletions(-) diff --git a/apps/server/src/ingest/cron.ts b/apps/server/src/ingest/cron.ts index 0fdd718..54c18b7 100644 --- a/apps/server/src/ingest/cron.ts +++ b/apps/server/src/ingest/cron.ts @@ -22,7 +22,68 @@ import type { CardReader } from "../cards"; * Each run fetches one ecosystem's archive, processes records whose * `modified` timestamp is newer than the per-ecosystem watermark, and updates * the watermark. + * + * Each ingest step emits structured progress lines so `docker logs` shows a + * "loading bar" of what's actually happening per source: + * + * refuse: ingest[osv:npm] ▶ starting (ecosystem 1/28) + * refuse: ingest[osv:npm] 5000 records • 12s + * refuse: ingest[osv:npm] ✓ done — 14502 records in 41s + */ + +/** Consistent prefix for ingest progress logs — greppable + alignable. */ +function logIngest(tag: string, msg: string): void { + // Pad tag to a fixed width so output columns stay aligned across sources. + // 18 chars covers "osv:Rocky Linux:9" with room for one more char of slack. + console.log(`refuse: ingest[${tag.padEnd(18)}] ${msg}`); +} + +/** Format an elapsed-ms count as a short string (1.2s / 14s / 1m23s). */ +function fmtElapsed(startedAt: number): string { + const ms = Date.now() - startedAt; + if (ms < 1000) return `${ms}ms`; + if (ms < 60_000) return `${(ms / 1000).toFixed(ms < 10_000 ? 1 : 0)}s`; + const m = Math.floor(ms / 60_000); + const s = Math.round((ms - m * 60_000) / 1000); + return `${m}m${s}s`; +} + +/** + * Render a 20-char ASCII progress bar. Pct ∈ [0, 1] is clamped — streaming + * sources pass an estimated pct that may "stick" near 99% until the source + * completes; the authoritative final count is shown in the trailing `done` + * line so the bar is decorative-but-honest rather than misleading. + */ +function bar(pct: number, width = 20): string { + const clamped = Math.max(0, Math.min(1, pct)); + const filled = Math.round(clamped * width); + return `[${"█".repeat(filled)}${"░".repeat(width - filled)}] ${Math.floor(clamped * 100) + .toString() + .padStart(3)}%`; +} + +/** + * Calibrated soft-caps for streaming sources. We don't know how many records + * a given OSV ecosystem zip will yield until we finish reading it, so we pick + * a per-ecosystem ceiling that npm-class ecosystems comfortably saturate and + * smaller ecosystems mark "fast-and-done" against. The bar holds at ~99% if + * we overshoot the estimate — the `done` line tells the truth. */ +const OSV_ECOSYSTEM_ESTIMATE: Record = { + npm: 30_000, + PyPI: 20_000, + Maven: 15_000, + Go: 8_000, + "crates.io": 4_000, + RubyGems: 3_000, + NuGet: 4_000, + Packagist: 2_000, + Hex: 500, + Pub: 200, + "GitHub Actions": 500, +}; +const OSV_ECOSYSTEM_ESTIMATE_DEFAULT = 5_000; // distros + anything unseen +const EPSS_ESTIMATE = 280_000; // Versioned distro identifiers OSV publishes per-ecosystem zips for. These // expand check_dockerfile coverage to cover apt/apk/dnf packages on the @@ -91,8 +152,10 @@ export async function runOsvDelta(db: D1LikeDatabase, cards: CardReader): Promis const ecosystem = ROTATION[cursor.next_index % ROTATION.length]!; const watermark = cursor.watermarks[ecosystem] ?? "1970-01-01T00:00:00Z"; + const tag = `osv:${ecosystem}`; let processed = 0; + let lastReportedAt = 0; const allChanged = new Map(); let newWatermark = watermark; const startedAt = Date.now(); @@ -107,8 +170,20 @@ export async function runOsvDelta(db: D1LikeDatabase, cards: CardReader): Promis for (const k of result.affected_pairs_changed) { allChanged.set(`${k.ecosystem}::${k.package_name}`, k); } + // Emit progress every ~5s so docker-logs viewers see a steady tick + // instead of one wall of text at the end. + if (Date.now() - lastReportedAt >= 5000) { + const est = OSV_ECOSYSTEM_ESTIMATE[ecosystem] ?? OSV_ECOSYSTEM_ESTIMATE_DEFAULT; + logIngest(tag, `${bar(processed / est)} • ${processed} records • ${fmtElapsed(startedAt)}`); + lastReportedAt = Date.now(); + } }; + logIngest( + tag, + `▶ starting (ecosystem ${(cursor.next_index % ROTATION.length) + 1}/${ROTATION.length})`, + ); + try { const body = await osv.openEcosystemArchive(ecosystem); // OSV stopped publishing this ecosystem (typically a distro after EOL). @@ -122,6 +197,7 @@ export async function runOsvDelta(db: D1LikeDatabase, cards: CardReader): Promis await recordIngestionState(db, "osv", "ok", 0, { lastModified: JSON.stringify(skipCursor), error: `${ecosystem}: archive 404 (skipped)`}); + logIngest(tag, `· archive 404 — skipping, advancing rotation`); return; } const { stopped } = await streamZipRecords(body, { @@ -161,6 +237,10 @@ export async function runOsvDelta(db: D1LikeDatabase, cards: CardReader): Promis watermarks: { ...cursor.watermarks, [ecosystem]: newWatermark }}; await recordIngestionState(db, "osv", "ok", processed, { lastModified: JSON.stringify(nextCursor)}); + logIngest( + tag, + `✓ done — ${processed} records in ${fmtElapsed(startedAt)}${stopped ? " (budget hit, resumes next tick)" : ""}`, + ); } catch (e) { // Try to flush whatever made it through before the failure so we don't // re-process those records on the next run. @@ -212,12 +292,25 @@ export async function runDepsDevRefresh(db: D1LikeDatabase, cards: CardReader): .all<{ ecosystem: string; package_name: string }>(); const targets = packagesRes.results ?? []; + const depsT0 = Date.now(); + logIngest("deps-dev", `▶ starting (${targets.length} packages in this batch)`); + let processed = 0; + let lastDepsReportedAt = 0; + let pkgIndex = 0; const changed: AffectedKey[] = []; const failures: string[] = []; const FAILURE_LIMIT = 50; for (const { ecosystem, package_name } of targets) { + pkgIndex++; + if (Date.now() - lastDepsReportedAt >= 5000) { + logIngest( + "deps-dev", + `${bar(pkgIndex / Math.max(1, targets.length))} • ${pkgIndex}/${targets.length} packages • ${fmtElapsed(depsT0)}`, + ); + lastDepsReportedAt = Date.now(); + } try { const pkg = await fetcher.getPackageVersions(ecosystem, package_name); if (!pkg) continue; @@ -310,6 +403,10 @@ export async function runDepsDevRefresh(db: D1LikeDatabase, cards: CardReader): errorOpt.error = `${failures.length} per-package failures (this batch): ${failures.slice(0, 3).join("; ")}`; } await recordIngestionState(db, "deps_dev", "ok", processed, errorOpt); + logIngest( + "deps-dev", + `✓ done — ${processed} version rows across ${targets.length} packages in ${fmtElapsed(depsT0)}${failures.length ? ` (${failures.length} per-package failures)` : ""}`, + ); } /** @@ -324,69 +421,94 @@ export async function runDailyEnrichment( ): Promise { const errors: string[] = []; - try { - const r = await refreshKev(db); - await recordIngestionState(db, "kev", "ok", r.upserted); - console.log(`KEV: fetched=${r.fetched} upserted=${r.upserted}`); - } catch (e) { - const msg = (e as Error).message; - errors.push(`KEV: ${msg}`); - await recordIngestionState(db, "kev", "error", 0, { error: msg }).catch(() => {}); + { + const t0 = Date.now(); + logIngest("kev", "▶ starting"); + try { + const r = await refreshKev(db); + await recordIngestionState(db, "kev", "ok", r.upserted); + logIngest("kev", `${bar(1)} • ${r.upserted}/${r.fetched} entries`); + logIngest("kev", `✓ done — ${r.upserted} entries in ${fmtElapsed(t0)}`); + } catch (e) { + const msg = (e as Error).message; + errors.push(`KEV: ${msg}`); + await recordIngestionState(db, "kev", "error", 0, { error: msg }).catch(() => {}); + logIngest("kev", `✗ failed in ${fmtElapsed(t0)}: ${msg}`); + } } - try { - const r = await refreshEpss(db); - await recordIngestionState(db, "epss", "ok", r.upserted, { - ...(r.scored_date ? { lastModified: r.scored_date } : {}), - }); - console.log(`EPSS: fetched=${r.fetched} upserted=${r.upserted} date=${r.scored_date}`); - } catch (e) { - const msg = (e as Error).message; - errors.push(`EPSS: ${msg}`); - await recordIngestionState(db, "epss", "error", 0, { error: msg }).catch(() => {}); + { + const t0 = Date.now(); + logIngest("epss", "▶ starting"); + try { + const r = await refreshEpss(db); + await recordIngestionState(db, "epss", "ok", r.upserted, { + ...(r.scored_date ? { lastModified: r.scored_date } : {}), + }); + logIngest("epss", `${bar(r.upserted / EPSS_ESTIMATE)} • ${r.upserted} rows scored ${r.scored_date ?? ""}`); + logIngest("epss", `✓ done — ${r.upserted} rows in ${fmtElapsed(t0)}`); + } catch (e) { + const msg = (e as Error).message; + errors.push(`EPSS: ${msg}`); + await recordIngestionState(db, "epss", "error", 0, { error: msg }).catch(() => {}); + logIngest("epss", `✗ failed in ${fmtElapsed(t0)}: ${msg}`); + } } - try { - const stateRow = await db - .prepare(`SELECT last_modified FROM ingestion_state WHERE source = 'ghsa_direct'`) - .first<{ last_modified: string | null }>(); - const cursor = stateRow?.last_modified ?? undefined; - const { records, cursor: nextCursor } = await pullGhsaPage( - options.GITHUB_TOKEN, - 100, - cursor, - ); - if (records.length > 0) { - const normalized = records - .map((r) => normalizeOsv(r)) - .filter((n): n is NonNullable => n !== null); - const result = await upsertRecords(db, normalized); - await publishCards(db, cards, result.affected_pairs_changed, { - concurrency: 16}); + { + const t0 = Date.now(); + try { + const stateRow = await db + .prepare(`SELECT last_modified FROM ingestion_state WHERE source = 'ghsa_direct'`) + .first<{ last_modified: string | null }>(); + const cursor = stateRow?.last_modified ?? undefined; + logIngest("ghsa", `▶ starting (${cursor ? "resuming from saved cursor" : "no prior cursor"})`); + const { records, cursor: nextCursor } = await pullGhsaPage( + options.GITHUB_TOKEN, + 100, + cursor, + ); + if (records.length > 0) { + const normalized = records + .map((r) => normalizeOsv(r)) + .filter((n): n is NonNullable => n !== null); + const result = await upsertRecords(db, normalized); + await publishCards(db, cards, result.affected_pairs_changed, { + concurrency: 16}); + } + await recordIngestionState(db, "ghsa_direct", "ok", records.length, { + ...(nextCursor ? { lastModified: nextCursor } : {})}); + logIngest("ghsa", `${bar(records.length / 100)} • ${records.length}/100 records this page`); + logIngest("ghsa", `✓ done — ${records.length} records in ${fmtElapsed(t0)}${nextCursor ? " (cursor saved)" : " (caught up)"}`); + } catch (e) { + const msg = (e as Error).message; + errors.push(`GHSA direct: ${msg}`); + logIngest("ghsa", `✗ failed in ${fmtElapsed(t0)}: ${msg}`); } - await recordIngestionState(db, "ghsa_direct", "ok", records.length, { - ...(nextCursor ? { lastModified: nextCursor } : {})}); - console.log(`GHSA direct: imported=${records.length}`); - } catch (e) { - errors.push(`GHSA direct: ${(e as Error).message}`); } - try { - const { records, stats } = await pullWolfi(); - if (records.length > 0) { - const normalized = records - .map((r) => normalizeOsv(r)) - .filter((n): n is NonNullable => n !== null); - const result = await upsertRecords(db, normalized); - await publishCards(db, cards, result.affected_pairs_changed, { - concurrency: 16}); + { + const t0 = Date.now(); + logIngest("wolfi", "▶ starting"); + try { + const { records, stats } = await pullWolfi(); + if (records.length > 0) { + const normalized = records + .map((r) => normalizeOsv(r)) + .filter((n): n is NonNullable => n !== null); + const result = await upsertRecords(db, normalized); + await publishCards(db, cards, result.affected_pairs_changed, { + concurrency: 16}); + } + await recordIngestionState(db, "wolfi", "ok", records.length); + logIngest("wolfi", `${bar(1)} • ${stats.packages} packages, ${stats.records} records`); + logIngest("wolfi", `✓ done — ${stats.records} records across ${stats.packages} packages in ${fmtElapsed(t0)}`); + } catch (e) { + const msg = (e as Error).message; + errors.push(`Wolfi: ${msg}`); + await recordIngestionState(db, "wolfi", "error", 0, { error: msg }).catch(() => {}); + logIngest("wolfi", `✗ failed in ${fmtElapsed(t0)}: ${msg}`); } - await recordIngestionState(db, "wolfi", "ok", records.length); - console.log(`Wolfi: packages=${stats.packages} records=${stats.records}`); - } catch (e) { - const msg = (e as Error).message; - errors.push(`Wolfi: ${msg}`); - await recordIngestionState(db, "wolfi", "error", 0, { error: msg }).catch(() => {}); } await recordIngestionState(