diff --git a/CHANGELOG.md b/CHANGELOG.md index e9c35108..09611dd2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,13 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +### Added + +- Added repository-level vulnerability summary (`cloudsmith vulnerabilities OWNER/REPO`) + - Aggregates scan results across all packages into a single color-coded table + - Packages sorted by total vulnerability count (descending) + - Supports `--severity` and `--fixable/--non-fixable` filters + ## [1.16.0] - 2026-03-24 ### Added diff --git a/cloudsmith_cli/cli/commands/vulnerabilities.py b/cloudsmith_cli/cli/commands/vulnerabilities.py index f140be71..9bc7fe20 100644 --- a/cloudsmith_cli/cli/commands/vulnerabilities.py +++ b/cloudsmith_cli/cli/commands/vulnerabilities.py @@ -1,17 +1,314 @@ """CLI/Commands - Vulnerabilities.""" +# Copyright 2026 Cloudsmith Ltd + import click +from rich.console import Console +from rich.progress import BarColumn, Progress, SpinnerColumn, TextColumn +from rich.table import Table +from ...core.api.packages import list_packages from ...core.api.vulnerabilities import ( _print_vulnerabilities_assessment_table, _print_vulnerabilities_summary_table, get_package_scan_result, ) from .. import decorators, utils, validators -from ..exceptions import handle_api_exceptions from .main import main +def get_packages_in_repo(opts, owner, repo): + """Get all packages in a repository, paginating through all pages.""" + all_packages = [] + page = 1 + page_size = 100 # fetch in larger batches for efficiency + + try: + while True: + packages, page_info = list_packages( + opts=opts, + owner=owner, + repo=repo, + query=None, + sort=None, + page=page, + page_size=page_size, + ) + + if packages: + all_packages.extend(packages) + + # No page info means single page or no results + if not page_info: + break + + current_page = getattr(page_info, "page", page) + total_pages = getattr(page_info, "page_total", 1) + + if current_page >= total_pages: + break + + page += 1 + + except Exception as exc: + raise click.ClickException( + f"Failed to list packages for '{owner}/{repo}'. " + f"Please check the owner and repository names are correct. " + f"Detail: {exc}" + ) from exc + + if not all_packages: + raise click.ClickException( + f"No packages found in '{owner}/{repo}'. " + f"The repository may be empty, or the owner/repo names may be incorrect." + ) + + return [ + (pkg["slug_perm"], pkg.get("name", pkg["slug_perm"]), pkg.get("version", "")) + for pkg in all_packages + ] + + +def _has_scan_results(data): + """Check whether scan data contains actual scan results.""" + scans = getattr(data, "scans", None) + if scans is None: + return False + return len(scans) > 0 + + +def _has_vulnerabilities(data): + """Check whether scan data contains any vulnerability results.""" + scans = getattr(data, "scans", []) + return any(getattr(scan, "results", []) for scan in scans) + + +def _aggregate_severity_counts(data, severity_filter=None): + """Aggregate vulnerability counts by severity for a single package scan.""" + severity_keys = ["critical", "high", "medium", "low", "unknown"] + + if severity_filter: + allowed = [s.strip().lower() for s in severity_filter.split(",")] + severity_keys = [k for k in severity_keys if k in allowed] + + counts = {k: 0 for k in severity_keys} + + scans = getattr(data, "scans", []) + for scan in scans: + results = getattr(scan, "results", []) + for result in results: + severity = getattr(result, "severity", "unknown").lower() + if severity in counts: + counts[severity] += 1 + elif "unknown" in counts: + counts["unknown"] += 1 + + return counts + + +def _apply_filters(data, severity_filter, fixable): + """Apply severity and fixable filters to scan results in-place. Returns filtered count.""" + total_filtered = 0 + scans = getattr(data, "scans", []) + + allowed_severities = ( + [s.strip().lower() for s in severity_filter.split(",")] + if severity_filter + else None + ) + + for scan in scans: + results = getattr(scan, "results", []) + + if allowed_severities: + results = [ + res + for res in results + if getattr(res, "severity", "unknown").lower() in allowed_severities + ] + + if fixable is not None: + results = [ + res + for res in results + if bool( + getattr(res, "fix_version", getattr(res, "fixed_version", None)) + ) + is fixable + ] + + scan.results = results + total_filtered += len(results) + + return total_filtered + + +# Severity color mapping for consistent styling +SEVERITY_COLORS = { + "critical": "red", + "high": "bright_red", + "medium": "yellow", + "low": "blue", + "unknown": "dim white", +} + + +def _colorize_count(count, severity_key): + """Return a rich-styled count string, colored only when count > 0.""" + if count > 0: + color = SEVERITY_COLORS.get(severity_key, "white") + return f"[{color}]{count}[/{color}]" + return f"[dim]{count}[/dim]" + + +def _print_repo_summary_table(package_rows, severity_filter=None): + """Print a single combined summary table for all packages in a repo.""" + severity_keys = { + "Critical": "critical", + "High": "high", + "Medium": "medium", + "Low": "low", + "Unknown": "unknown", + } + + if severity_filter: + allowed = [s.strip().lower() for s in severity_filter.split(",")] + severity_keys = {k: v for k, v in severity_keys.items() if v in allowed} + + console = Console() + table = Table( + title="Repository Vulnerabilities Summary", + show_header=True, + header_style="bold", + show_lines=True, + border_style="bright_black", + padding=(0, 1), + ) + + table.add_column("Package", justify="left", style="cyan", no_wrap=True) + table.add_column("Identifier", justify="left", style="dim", no_wrap=True) + for display_name, sev_key in severity_keys.items(): + color = SEVERITY_COLORS.get(sev_key, "white") + table.add_column(display_name, justify="center", header_style=f"bold {color}") + table.add_column("Total", justify="center", header_style="bold white") + + grand_total = 0 + num_sev_cols = len(severity_keys) + + for slug_perm, label, counts, status in package_rows: + cells = [label, slug_perm] + if status == "no_scan": + cells.append("[dim italic]Security scan not supported[/dim italic]") + cells.extend([""] * (num_sev_cols - 1)) + cells.append("") + elif status == "no_issues_found": + cells.append("[bold green]No issues found[/bold green]") + cells.extend([""] * (num_sev_cols - 1)) + cells.append("") + else: + row_total = 0 + for _display, sev_key in severity_keys.items(): + count = counts.get(sev_key, 0) + cells.append(_colorize_count(count, sev_key)) + row_total += count + total_style = "[bold red]" if row_total > 0 else "[dim]" + cells.append(f"{total_style}{row_total}[/]") + grand_total += row_total + table.add_row(*cells) + + console.print() + console.print(table) + console.print(f"\nTotal Vulnerabilities: [bold]{grand_total}[/bold]\n") + + +def _collect_repo_scan_data(opts, owner, repo, slugs, severity_filter, fixable): + """Silently collect scan data for all packages with a progress bar. + + Returns list of (slug, label, counts, status) tuples where status is one of + "vulnerable", "safe", or "no_scan". Sorted: vulnerable (by count desc), + then safe, then no_scan. + """ + rows = [] + console = Console(stderr=True) + + with Progress( + SpinnerColumn(), + TextColumn("[bold blue]{task.description}"), + BarColumn(bar_width=40), + TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), + TextColumn("({task.completed}/{task.total})"), + console=console, + transient=True, + ) as progress: + task = progress.add_task("Scanning packages...", total=len(slugs)) + + for slug, pkg_name_fallback, pkg_version_fallback in slugs: + progress.update(task, description=f"Processing {slug}...") + fallback_label = ( + f"{pkg_name_fallback}:{pkg_version_fallback}" + if pkg_version_fallback + else pkg_name_fallback + ) + + try: + data = get_package_scan_result( + opts=opts, + owner=owner, + repo=repo, + package=slug, + show_assessment=False, + severity_filter=severity_filter, + fixable=fixable, + ) + except Exception: # pylint: disable=broad-exception-caught + rows.append((slug, fallback_label, {}, "no_scan")) + progress.advance(task) + continue + + # Build label from scan response metadata, fall back to list_packages data + pkg_data = getattr(data, "package", None) if data else None + pkg_name = ( + getattr(pkg_data, "name", pkg_name_fallback) + if pkg_data + else pkg_name_fallback + ) + pkg_version = ( + getattr(pkg_data, "version", pkg_version_fallback) + if pkg_data + else pkg_version_fallback + ) + label = f"{pkg_name}:{pkg_version}" if pkg_version else pkg_name + + if not data or not _has_scan_results(data): + rows.append((slug, label, {}, "no_scan")) + progress.advance(task) + continue + + # Apply filters if active + if severity_filter or fixable is not None: + _apply_filters(data, severity_filter, fixable) + + counts = _aggregate_severity_counts(data, severity_filter) + + if sum(counts.values()) > 0: + rows.append((slug, label, counts, "vulnerable")) + else: + rows.append((slug, label, counts, "no_issues_found")) + + progress.advance(task) + + # Sort: vulnerable first (by total desc), then safe, then no_scan + # When filters are active, only return packages with matching vulnerabilities + filters_active = severity_filter or fixable is not None + vulnerable = [r for r in rows if r[3] == "vulnerable"] + vulnerable.sort(key=lambda r: sum(r[2].values()), reverse=True) + if filters_active: + return vulnerable + safe = [r for r in rows if r[3] == "no_issues_found"] + no_scan = [r for r in rows if r[3] == "no_scan"] + return vulnerable + safe + no_scan + + @main.command() @decorators.common_cli_config_options @decorators.common_cli_output_options @@ -19,8 +316,8 @@ @decorators.initialise_api @click.argument( "owner_repo_package", - metavar="OWNER/REPO/PACKAGE", - callback=validators.validate_owner_repo_package, + metavar="OWNER/REPO or OWNER/REPO/PACKAGE", + callback=validators.validate_required_owner_repo_optional_slug_perm, ) @click.option( "-A", @@ -31,7 +328,7 @@ @click.option( "--fixable/--non-fixable", is_flag=True, - default=None, # Changed to allow None (Show All) vs True/False + default=None, # allow None (show all) vs True/False help="Filter by fixable status (only fixable vs only non-fixable).", ) @click.option( @@ -71,17 +368,92 @@ def vulnerabilities( \b # Filter by fixable or non-fixable vulnerabilities cloudsmith vulnerabilities myorg/repo/pkg_identifier --fixable / --non-fixable - - """ use_stderr = utils.should_use_stderr(opts) + repo_summary = False - owner, repo, slug = owner_repo_package + if len(owner_repo_package) == 3: + owner, repo, slug = owner_repo_package + else: + owner, repo = owner_repo_package + slug = None + repo_summary = True - total_filtered_vulns = 0 + if not owner or not repo: + raise click.ClickException( + "Both owner and repository must be specified (e.g., 'myorg/myrepo')." + ) + + if repo_summary and show_assessment: + click.secho( + "Show full assessment is not supported for the repo level summary.", + fg="yellow", + err=use_stderr, + ) + return + + # Repo summary mode: collect with progress bar, then output once + if repo_summary: + slugs = get_packages_in_repo(opts, owner, repo) + + repo_summary_rows = _collect_repo_scan_data( + opts, owner, repo, slugs, severity_filter, fixable + ) + + if not repo_summary_rows: + if severity_filter or fixable is not None: + filter_desc = severity_filter.upper() if severity_filter else None + if fixable is True: + filter_desc = ( + f"{filter_desc}, fixable" if filter_desc else "fixable" + ) + elif fixable is False: + filter_desc = ( + f"{filter_desc}, non-fixable" if filter_desc else "non-fixable" + ) + click.secho( + f"No packages found matching filter(s) [{filter_desc}] " + f"in '{owner}/{repo}'.", + fg="yellow", + err=use_stderr, + ) + else: + click.secho( + f"No scan data could be retrieved for any packages " + f"in '{owner}/{repo}'.", + fg="yellow", + err=use_stderr, + ) + return + + json_output = { + "owner": owner, + "repository": repo, + "packages": [ + { + "slug_perm": slug_perm, + "package": label, + "status": status, + "vulnerabilities": counts, + } + for slug_perm, label, counts, status in repo_summary_rows + ], + } - context_msg = "Failed to retrieve vulnerability report!" - with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): + if utils.maybe_print_as_json(opts, json_output): + return + + # Table only needs label, counts, and status + _print_repo_summary_table( + [ + (slug_perm, label, counts, status) + for slug_perm, label, counts, status in repo_summary_rows + ], + severity_filter, + ) + return + + try: with utils.maybe_spinner(opts): data = get_package_scan_result( opts=opts, @@ -92,45 +464,43 @@ def vulnerabilities( severity_filter=severity_filter, fixable=fixable, ) + except Exception as exc: # pylint: disable=broad-exception-caught + raise click.ClickException( + f"Failed to retrieve vulnerability report for " + f"'{owner}/{repo}/{slug}': {exc}" + ) from exc - click.secho("OK", fg="green", err=use_stderr) + if not data or not _has_scan_results(data): + click.secho( + f"No scan results found for '{owner}/{repo}/{slug}'. " + f"The package may not have been scanned yet or is not supported.", + fg="yellow", + err=use_stderr, + ) + return + + total_filtered_vulns = 0 - # Filter results if severity or fixable flags are active if severity_filter or fixable is not None: - scans = getattr(data, "scans", []) + total_filtered_vulns = _apply_filters(data, severity_filter, fixable) - allowed_severities = ( - [s.strip().lower() for s in severity_filter.split(",")] - if severity_filter - else None + if not _has_vulnerabilities(data) and total_filtered_vulns == 0: + click.secho( + f"Scan completed for '{owner}/{repo}/{slug}': " + f"no vulnerabilities detected.", + fg="green", + err=use_stderr, ) + else: + click.secho("OK", fg="green", err=use_stderr) - for scan in scans: - results = getattr(scan, "results", []) - - # 1. Filter by Severity - if allowed_severities: - results = [ - res - for res in results - if getattr(res, "severity", "unknown").lower() in allowed_severities - ] - - # 2. Filter by Fixable Status - # fixable=True: Keep only if has fix_version - # fixable=False: Keep only if NO fix_version - if fixable is not None: - results = [ - res - for res in results - if bool( - getattr(res, "fix_version", getattr(res, "fixed_version", None)) - ) - is fixable - ] - - scan.results = results - total_filtered_vulns += len(results) + if total_filtered_vulns == 0 and (severity_filter or fixable is not None): + click.secho( + f"Scan completed for '{owner}/{repo}/{slug}' but no " + f"vulnerabilities matched the applied filters.", + fg="yellow", + err=use_stderr, + ) if utils.maybe_print_as_json(opts, data): return diff --git a/cloudsmith_cli/cli/tests/commands/test_vulnerabilities.py b/cloudsmith_cli/cli/tests/commands/test_vulnerabilities.py index 03a092ee..e2ca3b65 100644 --- a/cloudsmith_cli/cli/tests/commands/test_vulnerabilities.py +++ b/cloudsmith_cli/cli/tests/commands/test_vulnerabilities.py @@ -1,5 +1,5 @@ import unittest -from unittest.mock import patch +from unittest.mock import MagicMock, patch from click.testing import CliRunner @@ -108,5 +108,274 @@ def test_vulnerabilities_non_fixable_filter(self, mock_get_scan): self.assertFalse(args["fixable"]) +# --------------------------------------------------------------------------- +# Helpers shared by repo-summary tests +# --------------------------------------------------------------------------- + + +def _pkg_dict(slug_perm, name, version="1.0.0"): + return {"slug_perm": slug_perm, "name": name, "version": version} + + +def _page_info(page=1, page_total=1): + pi = MagicMock() + pi.page = page + pi.page_total = page_total + return pi + + +def _scan_data_vulnerable(name, version, severities): + """Scan result with at least one vulnerability.""" + data = MagicMock() + data.package.name = name + data.package.version = version + scan = MagicMock() + scan.results = [MagicMock(severity=s) for s in severities] + data.scans = [scan] + return data + + +def _scan_data_safe(name, version): + """Scan result with no vulnerabilities (package was scanned, nothing found).""" + data = MagicMock() + data.package.name = name + data.package.version = version + scan = MagicMock() + scan.results = [] + data.scans = [scan] + return data + + +def _scan_data_no_scan(): + """No scan data available (package not yet scanned or unsupported format).""" + data = MagicMock() + data.scans = [] + return data + + +# --------------------------------------------------------------------------- +# Repo-level summary tests (OWNER/REPO, no package slug) +# --------------------------------------------------------------------------- + + +class TestRepoSummaryMode(unittest.TestCase): + def setUp(self): + self.runner = CliRunner() + + # --show-assessment is rejected for repo-level summary ────────────────── + + @patch("cloudsmith_cli.cli.commands.vulnerabilities.list_packages") + def test_show_assessment_rejected(self, mock_list): + """--show-assessment with OWNER/REPO prints a warning and exits cleanly.""" + result = self.runner.invoke( + vulnerabilities, ["testorg/testrepo", "--show-assessment"] + ) + + self.assertEqual(result.exit_code, 0) + self.assertIn("not supported", result.output) + mock_list.assert_not_called() + + # Single-package scenarios ─────────────────────────────────────────────── + + @patch("cloudsmith_cli.cli.commands.vulnerabilities.get_package_scan_result") + @patch("cloudsmith_cli.cli.commands.vulnerabilities.list_packages") + def test_single_vulnerable_package(self, mock_list, mock_scan): + """A repo with one vulnerable package produces a summary table.""" + mock_list.return_value = ([_pkg_dict("slug-abc", "my-lib")], _page_info()) + mock_scan.return_value = _scan_data_vulnerable( + "my-lib", "1.0.0", ["critical", "high"] + ) + + result = self.runner.invoke(vulnerabilities, ["testorg/testrepo"]) + + self.assertEqual(result.exit_code, 0) + mock_scan.assert_called_once() + scan_args = mock_scan.call_args[1] + self.assertEqual(scan_args["owner"], "testorg") + self.assertEqual(scan_args["repo"], "testrepo") + self.assertEqual(scan_args["package"], "slug-abc") + + @patch("cloudsmith_cli.cli.commands.vulnerabilities.get_package_scan_result") + @patch("cloudsmith_cli.cli.commands.vulnerabilities.list_packages") + def test_single_safe_package(self, mock_list, mock_scan): + """A scanned package with no vulnerabilities is included in the summary.""" + mock_list.return_value = ([_pkg_dict("slug-safe", "clean-lib")], _page_info()) + mock_scan.return_value = _scan_data_safe("clean-lib", "2.0.0") + + result = self.runner.invoke(vulnerabilities, ["testorg/testrepo"]) + + self.assertEqual(result.exit_code, 0) + mock_scan.assert_called_once() + + @patch("cloudsmith_cli.cli.commands.vulnerabilities.get_package_scan_result") + @patch("cloudsmith_cli.cli.commands.vulnerabilities.list_packages") + def test_single_no_scan_package(self, mock_list, mock_scan): + """A package with no scan data (unsupported format) is included with no-scan status.""" + mock_list.return_value = ([_pkg_dict("slug-bin", "binary-pkg")], _page_info()) + mock_scan.return_value = _scan_data_no_scan() + + result = self.runner.invoke(vulnerabilities, ["testorg/testrepo"]) + + self.assertEqual(result.exit_code, 0) + mock_scan.assert_called_once() + + @patch("cloudsmith_cli.cli.commands.vulnerabilities.get_package_scan_result") + @patch("cloudsmith_cli.cli.commands.vulnerabilities.list_packages") + def test_scan_fetch_exception_treated_as_no_scan(self, mock_list, mock_scan): + """A package whose scan fetch raises is included with no-scan status, not dropped.""" + mock_list.return_value = ([_pkg_dict("slug-err", "error-pkg")], _page_info()) + mock_scan.side_effect = Exception("connection refused") + + result = self.runner.invoke(vulnerabilities, ["testorg/testrepo"]) + + # Command should still exit cleanly — the package appears as no-scan + self.assertEqual(result.exit_code, 0) + + # Multiple packages ────────────────────────────────────────────────────── + + @patch("cloudsmith_cli.cli.commands.vulnerabilities.get_package_scan_result") + @patch("cloudsmith_cli.cli.commands.vulnerabilities.list_packages") + def test_multiple_packages_mixed_statuses(self, mock_list, mock_scan): + """Three packages with different statuses are all fetched and summarised.""" + packages = [ + _pkg_dict("slug-vuln", "vuln-lib"), + _pkg_dict("slug-safe", "safe-lib"), + _pkg_dict("slug-none", "unsupported-pkg"), + ] + mock_list.return_value = (packages, _page_info()) + mock_scan.side_effect = [ + _scan_data_vulnerable("vuln-lib", "1.0.0", ["critical"]), + _scan_data_safe("safe-lib", "1.0.0"), + _scan_data_no_scan(), + ] + + result = self.runner.invoke(vulnerabilities, ["testorg/testrepo"]) + + self.assertEqual(result.exit_code, 0) + self.assertEqual(mock_scan.call_count, 3) + + # Pagination ───────────────────────────────────────────────────────────── + + @patch("cloudsmith_cli.cli.commands.vulnerabilities.get_package_scan_result") + @patch("cloudsmith_cli.cli.commands.vulnerabilities.list_packages") + def test_pagination_fetches_all_pages(self, mock_list, mock_scan): + """Packages spanning two pages are all fetched.""" + page1_pkgs = [_pkg_dict("slug-a", "pkg-a"), _pkg_dict("slug-b", "pkg-b")] + page2_pkgs = [_pkg_dict("slug-c", "pkg-c")] + mock_list.side_effect = [ + (page1_pkgs, _page_info(page=1, page_total=2)), + (page2_pkgs, _page_info(page=2, page_total=2)), + ] + mock_scan.return_value = _scan_data_safe("pkg", "1.0.0") + + result = self.runner.invoke(vulnerabilities, ["testorg/testrepo"]) + + self.assertEqual(result.exit_code, 0) + self.assertEqual(mock_list.call_count, 2) + self.assertEqual(mock_scan.call_count, 3) + + # Verify page numbers were incremented correctly + page_numbers = [c[1]["page"] for c in mock_list.call_args_list] + self.assertEqual(page_numbers, [1, 2]) + + # Empty repo ───────────────────────────────────────────────────────────── + + @patch("cloudsmith_cli.cli.commands.vulnerabilities.list_packages") + def test_empty_repo_exits_with_error(self, mock_list): + """An empty repository raises a ClickException with a helpful message.""" + mock_list.return_value = ([], None) + + result = self.runner.invoke(vulnerabilities, ["testorg/testrepo"]) + + self.assertNotEqual(result.exit_code, 0) + + # --severity filter ────────────────────────────────────────────────────── + + @patch("cloudsmith_cli.cli.commands.vulnerabilities.get_package_scan_result") + @patch("cloudsmith_cli.cli.commands.vulnerabilities.list_packages") + def test_severity_filter_passed_to_scan(self, mock_list, mock_scan): + """--severity is forwarded to get_package_scan_result.""" + mock_list.return_value = ([_pkg_dict("slug-a", "pkg-a")], _page_info()) + mock_scan.return_value = _scan_data_vulnerable("pkg-a", "1.0.0", ["critical"]) + + result = self.runner.invoke( + vulnerabilities, ["testorg/testrepo", "--severity", "CRITICAL"] + ) + + self.assertEqual(result.exit_code, 0) + scan_args = mock_scan.call_args[1] + self.assertEqual(scan_args["severity_filter"], "CRITICAL") + + @patch("cloudsmith_cli.cli.commands.vulnerabilities.get_package_scan_result") + @patch("cloudsmith_cli.cli.commands.vulnerabilities.list_packages") + def test_severity_filter_no_matches_shows_message(self, mock_list, mock_scan): + """When --severity matches nothing, a descriptive message is shown.""" + mock_list.return_value = ([_pkg_dict("slug-a", "pkg-a")], _page_info()) + # Safe package — no critical vulnerabilities + mock_scan.return_value = _scan_data_safe("pkg-a", "1.0.0") + + result = self.runner.invoke( + vulnerabilities, ["testorg/testrepo", "--severity", "CRITICAL"] + ) + + self.assertEqual(result.exit_code, 0) + self.assertIn("No packages found matching filter", result.output) + self.assertIn("CRITICAL", result.output) + + @patch("cloudsmith_cli.cli.commands.vulnerabilities.get_package_scan_result") + @patch("cloudsmith_cli.cli.commands.vulnerabilities.list_packages") + def test_severity_filter_excludes_safe_and_no_scan(self, mock_list, mock_scan): + """With --severity active, safe and no-scan packages are not included in results.""" + packages = [ + _pkg_dict("slug-vuln", "vuln-lib"), + _pkg_dict("slug-safe", "safe-lib"), + ] + mock_list.return_value = (packages, _page_info()) + mock_scan.side_effect = [ + _scan_data_vulnerable("vuln-lib", "1.0.0", ["critical"]), + _scan_data_safe("safe-lib", "1.0.0"), + ] + + # Patch the table printer so we can inspect what rows were passed + with patch( + "cloudsmith_cli.cli.commands.vulnerabilities._print_repo_summary_table" + ) as mock_table: + result = self.runner.invoke( + vulnerabilities, ["testorg/testrepo", "--severity", "CRITICAL"] + ) + + self.assertEqual(result.exit_code, 0) + passed_rows = mock_table.call_args[0][0] + statuses = [row[3] for row in passed_rows] + self.assertIn("vulnerable", statuses) + self.assertNotIn("safe", statuses) + self.assertNotIn("no_issues_found", statuses) + self.assertNotIn("no_scan", statuses) + + # JSON output ──────────────────────────────────────────────────────────── + + @patch("cloudsmith_cli.cli.commands.vulnerabilities.get_package_scan_result") + @patch("cloudsmith_cli.cli.commands.vulnerabilities.list_packages") + @patch("cloudsmith_cli.cli.commands.vulnerabilities.utils") + def test_json_output_includes_status_field(self, mock_utils, mock_list, mock_scan): + """JSON output includes slug_perm, package, status, and vulnerabilities per package.""" + mock_list.return_value = ([_pkg_dict("slug-abc", "my-lib")], _page_info()) + mock_scan.return_value = _scan_data_vulnerable("my-lib", "1.0.0", ["high"]) + mock_utils.should_use_stderr.return_value = False + mock_utils.maybe_print_as_json.return_value = True # pretend JSON was printed + + result = self.runner.invoke(vulnerabilities, ["testorg/testrepo"]) + + self.assertEqual(result.exit_code, 0) + json_payload = mock_utils.maybe_print_as_json.call_args[0][1] + self.assertEqual(json_payload["owner"], "testorg") + self.assertEqual(json_payload["repository"], "testrepo") + self.assertEqual(len(json_payload["packages"]), 1) + pkg = json_payload["packages"][0] + self.assertEqual(pkg["slug_perm"], "slug-abc") + self.assertIn("status", pkg) + self.assertIn("vulnerabilities", pkg) + + if __name__ == "__main__": unittest.main() diff --git a/cloudsmith_cli/cli/validators.py b/cloudsmith_cli/cli/validators.py index f739d8d0..9f5bb796 100644 --- a/cloudsmith_cli/cli/validators.py +++ b/cloudsmith_cli/cli/validators.py @@ -126,6 +126,12 @@ def validate_required_owner_optional_repo(ctx, param, value): return validate_slashes(param, value, minimum=1, maximum=2, form=form) +def validate_required_owner_repo_optional_slug_perm(ctx, param, value): + """Ensure that owner/repo is formatted correctly, where owner/repo is required and slug_perm is optional.""" + form = "OWNER/REPO[/SLUG_PERM]" + return validate_slashes(param, value, minimum=2, maximum=3, form=form) + + def validate_owner(ctx, param, value): """Ensure that owner is formatted correctly.""" # pylint: disable=unused-argument diff --git a/cloudsmith_cli/core/api/vulnerabilities.py b/cloudsmith_cli/core/api/vulnerabilities.py index 5f6927f4..acc4a537 100644 --- a/cloudsmith_cli/core/api/vulnerabilities.py +++ b/cloudsmith_cli/core/api/vulnerabilities.py @@ -2,20 +2,39 @@ import click import cloudsmith_api +from rich.console import Console +from rich.table import Table from ...cli import utils from .. import ratelimits from .exceptions import catch_raise_api_exception from .init import get_api_client +# Severity color mapping for consistent styling +SEVERITY_COLORS = { + "critical": "red", + "high": "bright_red", + "medium": "yellow", + "low": "blue", + "unknown": "dim white", +} + def get_vulnerabilities_api(): """Get the vulnerabilities API client.""" return get_api_client(cloudsmith_api.VulnerabilitiesApi) +def _colorize_count(count, severity_key): + """Return a rich-styled count string, colored only when count > 0.""" + if count > 0: + color = SEVERITY_COLORS.get(severity_key, "white") + return f"[{color}]{count}[/{color}]" + return f"[dim]{count}[/dim]" + + def _print_vulnerabilities_summary_table(data, severity_filter, total_filtered_vulns): - """Print vulnerabilities as a table.""" + """Print vulnerabilities as a color-coded table.""" severity_keys = { "Critical": "critical", @@ -29,10 +48,6 @@ def _print_vulnerabilities_summary_table(data, severity_filter, total_filtered_v allowed = [s.strip().lower() for s in severity_filter.split(",")] severity_keys = {k: v for k, v in severity_keys.items() if v in allowed} - headers = [{"header": "Package", "justify": "left", "style": "cyan"}] - for key in severity_keys.keys(): - headers.append({"header": key, "justify": "center", "style": "white"}) - # Get package name and version for the target label pkg_data = getattr(data, "package", None) pkg_name = getattr(pkg_data, "name", "Unknown") @@ -53,29 +68,43 @@ def _print_vulnerabilities_summary_table(data, severity_filter, total_filtered_v elif "unknown" in counts: counts["unknown"] += 1 - # Create the single summary row - row = [target_label] - for _header, key in severity_keys.items(): - row.append(str(counts[key])) - - rows = [row] - - click.echo() - click.echo() - - utils.rich_print_table(headers=headers, rows=rows, title="Vulnerabilities Summary") + # Build the rich table + console = Console() + table = Table( + title="Vulnerabilities Summary", + show_header=True, + header_style="bold", + show_lines=True, + border_style="bright_black", + padding=(0, 1), + ) + + table.add_column("Package", justify="left", style="cyan", no_wrap=True) + for display_name, sev_key in severity_keys.items(): + color = SEVERITY_COLORS.get(sev_key, "white") + table.add_column(display_name, justify="center", header_style=f"bold {color}") + table.add_column("Total", justify="center", header_style="bold white") + + # Build the row + row_total = 0 + cells = [target_label] + for _display, sev_key in severity_keys.items(): + count = counts.get(sev_key, 0) + cells.append(_colorize_count(count, sev_key)) + row_total += count + total_style = "[bold red]" if row_total > 0 else "[dim]" + cells.append(f"{total_style}{row_total}[/]") + table.add_row(*cells) + + console.print() + console.print(table) if severity_filter: - filters = severity_filter.upper() - click.echo( - f"\nTotal Vulnerabilities: {getattr(data, 'num_vulnerabilities', 0)}" + console.print( + f"\nFiltered Vulnerabilities: [bold]{total_filtered_vulns}[/bold]\n" ) - click.echo(f"\nTotal {filters} Vulnerabilities: {total_filtered_vulns}") else: - click.echo( - f"\nTotal Vulnerabilities: {getattr(data, 'num_vulnerabilities', 0)}" - ) - click.echo() + console.print(f"\nTotal Vulnerabilities: [bold]{row_total}[/bold]\n") def _print_vulnerabilities_assessment_table(data, severity_filter=None): @@ -206,6 +235,12 @@ def get_package_scan_identifier(owner, repo, package): ratelimits.maybe_rate_limit(client, headers) + if not data: + # click.echo( + # f"No vulnerability scan results found for package: {package}", err=True + # ) + return None + return data[0].identifier