Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 38 additions & 28 deletions quale/reports/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
)

if TYPE_CHECKING:
pass
from quale.scanner import CodebaseAnalysis

# ── CI Report ─────────────────────────────────────────────────────

Expand Down Expand Up @@ -589,7 +589,7 @@ def preflight_report(path: str = ".", files: list[str] | None = None,
mirror = _mirror_signals(changed, analysis.file_vocabs)

try:
stability_data = compute_stability(path, weeks=12)
stability_data = compute_stability(path, weeks=12, analysis=analysis)
except Exception:
stability_data = []

Expand Down Expand Up @@ -628,11 +628,11 @@ def preflight_report(path: str = ".", files: list[str] | None = None,

# Tier 1 signals — temperature per changed file
try:
lifecycle_data = compute_lifecycles(path, weeks=24)
lifecycle_data = compute_lifecycles(path, weeks=24, analysis=analysis)
except Exception:
lifecycle_data = []
try:
entropy_data = entropy_velocity(path, weeks=12)
entropy_data = entropy_velocity(path, weeks=12, analysis=analysis)
except Exception:
entropy_data = None
file_temps = {}
Expand Down Expand Up @@ -3666,16 +3666,18 @@ def _phrase_set(analysis):
"migration_order": "Apply mask to loose craters first, then tight." if craters else "No impact craters found.",
}

def capillary_report(path: str = ".", top_n: int = 5) -> dict:
def capillary_report(path: str = ".", top_n: int = 5,
analysis: CodebaseAnalysis | None = None) -> dict:
"""Capillary action — high-edge-count files (brittle coupling)."""
if not vgit.is_repo(path):
return {"error": "Not a git repository."}
path = os.path.abspath(path)
from quale.scanner import scan_codebase
try:
analysis = scan_codebase(path, quiet=True, max_files=2500, max_seconds=30)
except Exception as e:
return {"error": f"scan failed: {e}"}
if analysis is None:
try:
analysis = scan_codebase(path, quiet=True, max_files=2500, max_seconds=30)
except Exception as e:
return {"error": f"scan failed: {e}"}
token_re = re.compile(r'\b[A-Z][a-zA-Z0-9_]{4,40}\b')
code_exts = frozenset({".go", ".ts", ".js", ".py", ".rs", ".rb", ".java", ".c", ".cpp", ".h", ".zig", ".ex", ".exs", ".nix", ".jl"})
file_tokens: dict[str, set[str]] = {}
Expand Down Expand Up @@ -3833,17 +3835,18 @@ def _tokens(fp):
return {"file_a": file_a, "file_b": file_b, "overlap": overlap,
"label": "divergence gap" if overlap < 0.1 else ("over-trap" if overlap > 0.3 else "ideal trap")}

def thanatosis_report(path: str = ".") -> dict:
def thanatosis_report(path: str = ".", analysis: CodebaseAnalysis | None = None) -> dict:
if not vgit.is_repo(path):
return {"error": "Not a git repository."}
path = os.path.abspath(path)
from collections import Counter

from quale.scanner import scan_codebase
try:
analysis = scan_codebase(path, quiet=True, max_files=2500, max_seconds=30)
except Exception as e:
return {"error": f"scan failed: {e}"}
if analysis is None:
try:
analysis = scan_codebase(path, quiet=True, max_files=2500, max_seconds=30)
except Exception as e:
return {"error": f"scan failed: {e}"}
token_re = re.compile(r'\b[A-Z][a-zA-Z0-9_]{4,40}\b')
ft = {}
for fv in analysis.file_vocabs:
Expand Down Expand Up @@ -4098,13 +4101,13 @@ def cleanup_list_report(path: str = ".") -> dict:
items.append({"identifier": t["identifier"], "files": t["files"], "effort": label})
return {"items": items, "free_to_delete": sum(1 for i in items if i["effort"] == "ESCAPED")}

def vulnerability_report(path: str = ".") -> dict:
def vulnerability_report(path: str = ".", analysis: CodebaseAnalysis | None = None) -> dict:
if not vgit.is_repo(path):
return {"error": "Not a git repository."}
p = os.path.abspath(path)
try:
tt = thanatosis_report(path=p)
cp = capillary_report(path=p)
tt = thanatosis_report(path=p, analysis=analysis)
cp = capillary_report(path=p, analysis=analysis)
except Exception as e:
return {"error": f"scan: {e}"}
dt = {f["file"] for f in tt.get("files", [])}
Expand Down Expand Up @@ -5815,7 +5818,8 @@ def _classify_files(

# ── Stability anchors ─────────────────────────────────────────────

def compute_stability(path: str, weeks: int = 12, min_appearances: int = 4) -> list[dict]:
def compute_stability(path: str, weeks: int = 12, min_appearances: int = 4,
analysis: CodebaseAnalysis | None = None) -> list[dict]:
"""Per-file stability using git log (single call) instead of N rescans.

Issues ONE `git log --name-only` call for the entire window, buckets file
Expand All @@ -5832,7 +5836,8 @@ def compute_stability(path: str, weeks: int = 12, min_appearances: int = 4) -> l

from quale.scanner import scan_codebase

analysis = scan_codebase(path, quiet=True, max_files=2000, max_seconds=25)
if analysis is None:
analysis = scan_codebase(path, quiet=True, max_files=2000, max_seconds=25)
if not analysis.file_vocabs:
return []

Expand Down Expand Up @@ -5900,7 +5905,8 @@ def compute_stability(path: str, weeks: int = 12, min_appearances: int = 4) -> l
".r", ".jl", ".scala",
})

def compute_lifecycles(path: str, weeks: int = 24) -> list[dict]:
def compute_lifecycles(path: str, weeks: int = 24,
analysis: CodebaseAnalysis | None = None) -> list[dict]:
"""Concept lifecycles using git diff (no per-file content reads).

Scans HEAD once, then uses git diff --unified=0 between weekly pairs to
Expand All @@ -5921,10 +5927,12 @@ def compute_lifecycles(path: str, weeks: int = 24) -> list[dict]:
rename_pairs: list[tuple[str, str, int]] = []

# Scan HEAD once
try:
head_analysis = scan_codebase(path, quiet=True, max_files=1500, max_seconds=20)
except Exception:
head_analysis = None
head_analysis = analysis
if head_analysis is None:
try:
head_analysis = scan_codebase(path, quiet=True, max_files=1500, max_seconds=20)
except Exception:
head_analysis = None

if head_analysis:
for fv in head_analysis.file_vocabs:
Expand Down Expand Up @@ -6318,15 +6326,15 @@ def health_score(path: str) -> float:

# Stability: stable anchor proportion
try:
stability_data = compute_stability(path, weeks=12)
stability_data = compute_stability(path, weeks=12, analysis=analysis)
stable_count = sum(1 for s in stability_data if s["persistence"] >= 0.8)
stable_ratio = min(stable_count / max(len(stability_data), 1), 1.0)
except Exception:
stable_ratio = 0.5

# Concept age
try:
lifecycle_data = compute_lifecycles(path, weeks=24)
lifecycle_data = compute_lifecycles(path, weeks=24, analysis=analysis)
if lifecycle_data:
dead = sum(1 for lc in lifecycle_data if lc["signal"] == "DEAD")
total_concepts = len(lifecycle_data)
Expand Down Expand Up @@ -7576,7 +7584,8 @@ def orient_report(path: str) -> dict:

# ── Entropy Velocity ─────────────────────────────────────────────

def entropy_velocity(path: str, weeks: int = 12, interval_weeks: int = 4) -> dict:
def entropy_velocity(path: str, weeks: int = 12, interval_weeks: int = 4,
analysis: CodebaseAnalysis | None = None) -> dict:
"""Shannon entropy of vocabulary distribution over time.

Scans HEAD once, then walks backwards through weekly refs using git diff to
Expand All @@ -7599,7 +7608,8 @@ def entropy_velocity(path: str, weeks: int = 12, interval_weeks: int = 4) -> dic
next_stop = len(week_data) - 1

# Scan HEAD once
analysis = scan_codebase(path, quiet=True, max_files=2500, max_seconds=30)
if analysis is None:
analysis = scan_codebase(path, quiet=True, max_files=2500, max_seconds=30)
if not analysis.file_vocabs:
return {"error": "No files scanned.", "schema_version": 1}

Expand Down
6 changes: 6 additions & 0 deletions quale/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,12 @@ def scan_codebase(path: str, git_ref: str | None = None, quiet: bool = False,
)
if len(_SCAN_CACHE) < _SCAN_CACHE_MAX:
_SCAN_CACHE[key] = result
# Fix 2: when deep=True completes, also populate the deep=False cache entry
# since deep vocabulary is a strict superset of shallow
if deep:
shallow_key = _scan_cache_key(path, git_ref, deep=False)
if shallow_key not in _SCAN_CACHE:
_SCAN_CACHE[shallow_key] = result
return result


Expand Down
2 changes: 1 addition & 1 deletion tests/snapshots/health_score.snap
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"excess_porosity": -0.014024,
"excess_porosity_rounded": -0.014,
"schema_version": null
}
2 changes: 1 addition & 1 deletion tests/test_snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,6 @@ def test_health_score_snapshot(self):
data = json.loads(r.stdout)
snapshot = {
"schema_version": data.get("schema_version"),
"excess_porosity": data.get("excess_porosity"),
"excess_porosity_rounded": round(data.get("excess_porosity"), 4) if data.get("excess_porosity") is not None else None,
}
self.assert_snapshot("health_score", json.dumps(snapshot, indent=2, sort_keys=True))
Loading