diff --git a/.quale/ci-history.jsonl b/.quale/ci-history.jsonl index 3f98103..881f978 100644 --- a/.quale/ci-history.jsonl +++ b/.quale/ci-history.jsonl @@ -78,3 +78,21 @@ {"timestamp": 1779985490.708004, "base_ref": "HEAD~1", "head_ref": "HEAD", "changed_files": 4, "blast_radius_count": 24, "mirror_gap_ratio": 0.156, "stable_touched_count": 4, "max_blast_tier": "critical", "hub_risk_flagged": [{"file": "quale/cli.py", "hub_rank": 3}], "clone_flagged": [{"file": ".quale/ci-history.jsonl", "clone_group": [".github/ISSUE_TEMPLATE/config.yml", ".github/PULL_REQUEST_TEMPLATE.md", ".github/workflows/stale.yml"], "similarity": 0.083}], "new_identifier_count": 1} {"timestamp": 1779985500.4287434, "base_ref": "HEAD~1", "head_ref": "HEAD", "changed_files": 4, "blast_radius_count": 24, "mirror_gap_ratio": 0.156, "stable_touched_count": 4, "max_blast_tier": "critical", "hub_risk_flagged": [{"file": "quale/cli.py", "hub_rank": 3}], "clone_flagged": [{"file": ".quale/ci-history.jsonl", "clone_group": [".github/ISSUE_TEMPLATE/config.yml", ".github/PULL_REQUEST_TEMPLATE.md", ".github/workflows/stale.yml"], "similarity": 0.083}], "new_identifier_count": 1} {"timestamp": 1779985501.299929, "base_ref": "HEAD~1", "head_ref": "HEAD", "changed_files": 4, "blast_radius_count": 24, "mirror_gap_ratio": 0.156, "stable_touched_count": 4, "max_blast_tier": "critical", "hub_risk_flagged": [{"file": "quale/cli.py", "hub_rank": 3}], "clone_flagged": [{"file": ".quale/ci-history.jsonl", "clone_group": [".github/ISSUE_TEMPLATE/config.yml", ".github/PULL_REQUEST_TEMPLATE.md", ".github/workflows/stale.yml"], "similarity": 0.083}], "new_identifier_count": 1} +{"timestamp": 1779985964.562954, "base_ref": "HEAD~1", "head_ref": "HEAD", "changed_files": 5, "blast_radius_count": 27, "mirror_gap_ratio": 0.158, "stable_touched_count": 5, "max_blast_tier": "critical", "hub_risk_flagged": [{"file": "quale/cli.py", "hub_rank": 3}], "clone_flagged": [{"file": ".quale/ci-history.jsonl", "clone_group": [".github/ISSUE_TEMPLATE/config.yml", ".github/PULL_REQUEST_TEMPLATE.md", ".github/workflows/stale.yml"], "similarity": 0.083}], "new_identifier_count": 2} +{"timestamp": 1779985971.1160638, "base_ref": "HEAD~1", "head_ref": "HEAD", "changed_files": 5, "blast_radius_count": 27, "mirror_gap_ratio": 0.158, "stable_touched_count": 5, "max_blast_tier": "critical", "hub_risk_flagged": [{"file": "quale/cli.py", "hub_rank": 3}], "clone_flagged": [{"file": ".quale/ci-history.jsonl", "clone_group": [".github/ISSUE_TEMPLATE/config.yml", ".github/PULL_REQUEST_TEMPLATE.md", ".github/workflows/stale.yml"], "similarity": 0.083}], "new_identifier_count": 2} +{"timestamp": 1779991982.5523138, "base_ref": "HEAD~1", "head_ref": "HEAD", "changed_files": 5, "blast_radius_count": 27, "mirror_gap_ratio": 0.158, "stable_touched_count": 5, "max_blast_tier": "critical", "hub_risk_flagged": [{"file": "quale/cli.py", "hub_rank": 3}], "clone_flagged": [{"file": ".quale/ci-history.jsonl", "clone_group": [".github/ISSUE_TEMPLATE/config.yml", ".github/PULL_REQUEST_TEMPLATE.md", ".github/workflows/stale.yml"], "similarity": 0.083}], "new_identifier_count": 2} +{"timestamp": 1779991997.8795328, "base_ref": "HEAD~1", "head_ref": "HEAD", "changed_files": 5, "blast_radius_count": 27, "mirror_gap_ratio": 0.158, "stable_touched_count": 5, "max_blast_tier": "critical", "hub_risk_flagged": [{"file": "quale/cli.py", "hub_rank": 3}], "clone_flagged": [{"file": ".quale/ci-history.jsonl", "clone_group": [".github/ISSUE_TEMPLATE/config.yml", ".github/PULL_REQUEST_TEMPLATE.md", ".github/workflows/stale.yml"], "similarity": 0.083}], "new_identifier_count": 2} +{"timestamp": 1779991999.5090065, "base_ref": "HEAD~1", "head_ref": "HEAD", "changed_files": 5, "blast_radius_count": 27, "mirror_gap_ratio": 0.158, "stable_touched_count": 5, "max_blast_tier": "critical", "hub_risk_flagged": [{"file": "quale/cli.py", "hub_rank": 3}], "clone_flagged": [{"file": ".quale/ci-history.jsonl", "clone_group": [".github/ISSUE_TEMPLATE/config.yml", ".github/PULL_REQUEST_TEMPLATE.md", ".github/workflows/stale.yml"], "similarity": 0.083}], "new_identifier_count": 2} +{"timestamp": 1779992721.2087755, "base_ref": "HEAD~1", "head_ref": "HEAD", "changed_files": 5, "blast_radius_count": 27, "mirror_gap_ratio": 0.158, "stable_touched_count": 5, "max_blast_tier": "critical", "hub_risk_flagged": [{"file": "quale/cli.py", "hub_rank": 3}], "clone_flagged": [{"file": ".quale/ci-history.jsonl", "clone_group": [".github/ISSUE_TEMPLATE/config.yml", ".github/PULL_REQUEST_TEMPLATE.md", ".github/workflows/stale.yml"], "similarity": 0.083}], "new_identifier_count": 2} +{"timestamp": 1779992732.0732007, "base_ref": "HEAD~1", "head_ref": "HEAD", "changed_files": 5, "blast_radius_count": 27, "mirror_gap_ratio": 0.158, "stable_touched_count": 5, "max_blast_tier": "critical", "hub_risk_flagged": [{"file": "quale/cli.py", "hub_rank": 3}], "clone_flagged": [{"file": ".quale/ci-history.jsonl", "clone_group": [".github/ISSUE_TEMPLATE/config.yml", ".github/PULL_REQUEST_TEMPLATE.md", ".github/workflows/stale.yml"], "similarity": 0.083}], "new_identifier_count": 2} +{"timestamp": 1779992733.1683042, "base_ref": "HEAD~1", "head_ref": "HEAD", "changed_files": 5, "blast_radius_count": 27, "mirror_gap_ratio": 0.158, "stable_touched_count": 5, "max_blast_tier": "critical", "hub_risk_flagged": [{"file": "quale/cli.py", "hub_rank": 3}], "clone_flagged": [{"file": ".quale/ci-history.jsonl", "clone_group": [".github/ISSUE_TEMPLATE/config.yml", ".github/PULL_REQUEST_TEMPLATE.md", ".github/workflows/stale.yml"], "similarity": 0.083}], "new_identifier_count": 2} +{"timestamp": 1779993098.6285026, "base_ref": "HEAD~1", "head_ref": "HEAD", "changed_files": 5, "blast_radius_count": 27, "mirror_gap_ratio": 0.158, "stable_touched_count": 5, "max_blast_tier": "critical", "hub_risk_flagged": [{"file": "quale/cli.py", "hub_rank": 3}], "clone_flagged": [{"file": ".quale/ci-history.jsonl", "clone_group": [".github/ISSUE_TEMPLATE/config.yml", ".github/PULL_REQUEST_TEMPLATE.md", ".github/workflows/stale.yml"], "similarity": 0.083}], "new_identifier_count": 2} +{"timestamp": 1779993104.057374, "base_ref": "HEAD~1", "head_ref": "HEAD", "changed_files": 5, "blast_radius_count": 27, "mirror_gap_ratio": 0.158, "stable_touched_count": 5, "max_blast_tier": "critical", "hub_risk_flagged": [{"file": "quale/cli.py", "hub_rank": 3}], "clone_flagged": [{"file": ".quale/ci-history.jsonl", "clone_group": [".github/ISSUE_TEMPLATE/config.yml", ".github/PULL_REQUEST_TEMPLATE.md", ".github/workflows/stale.yml"], "similarity": 0.083}], "new_identifier_count": 2} +{"timestamp": 1779993492.9940534, "base_ref": "HEAD~1", "head_ref": "HEAD", "changed_files": 5, "blast_radius_count": 27, "mirror_gap_ratio": 0.158, "stable_touched_count": 5, "max_blast_tier": "critical", "hub_risk_flagged": [{"file": "quale/cli.py", "hub_rank": 3}], "clone_flagged": [{"file": ".quale/ci-history.jsonl", "clone_group": [".github/ISSUE_TEMPLATE/config.yml", ".github/PULL_REQUEST_TEMPLATE.md", ".github/workflows/stale.yml"], "similarity": 0.083}], "new_identifier_count": 2} +{"timestamp": 1779993496.3423092, "base_ref": "HEAD~1", "head_ref": "HEAD", "changed_files": 5, "blast_radius_count": 27, "mirror_gap_ratio": 0.158, "stable_touched_count": 5, "max_blast_tier": "critical", "hub_risk_flagged": [{"file": "quale/cli.py", "hub_rank": 3}], "clone_flagged": [{"file": ".quale/ci-history.jsonl", "clone_group": [".github/ISSUE_TEMPLATE/config.yml", ".github/PULL_REQUEST_TEMPLATE.md", ".github/workflows/stale.yml"], "similarity": 0.083}], "new_identifier_count": 2} +{"timestamp": 1779993534.5206554, "base_ref": "HEAD~1", "head_ref": "HEAD", "changed_files": 5, "blast_radius_count": 27, "mirror_gap_ratio": 0.158, "stable_touched_count": 5, "max_blast_tier": "critical", "hub_risk_flagged": [{"file": "quale/cli.py", "hub_rank": 3}], "clone_flagged": [{"file": ".quale/ci-history.jsonl", "clone_group": [".github/ISSUE_TEMPLATE/config.yml", ".github/PULL_REQUEST_TEMPLATE.md", ".github/workflows/stale.yml"], "similarity": 0.083}], "new_identifier_count": 2} +{"timestamp": 1779993543.5418565, "base_ref": "HEAD~1", "head_ref": "HEAD", "changed_files": 5, "blast_radius_count": 27, "mirror_gap_ratio": 0.158, "stable_touched_count": 5, "max_blast_tier": "critical", "hub_risk_flagged": [{"file": "quale/cli.py", "hub_rank": 3}], "clone_flagged": [{"file": ".quale/ci-history.jsonl", "clone_group": [".github/ISSUE_TEMPLATE/config.yml", ".github/PULL_REQUEST_TEMPLATE.md", ".github/workflows/stale.yml"], "similarity": 0.083}], "new_identifier_count": 2} +{"timestamp": 1779993544.5609477, "base_ref": "HEAD~1", "head_ref": "HEAD", "changed_files": 5, "blast_radius_count": 27, "mirror_gap_ratio": 0.158, "stable_touched_count": 5, "max_blast_tier": "critical", "hub_risk_flagged": [{"file": "quale/cli.py", "hub_rank": 3}], "clone_flagged": [{"file": ".quale/ci-history.jsonl", "clone_group": [".github/ISSUE_TEMPLATE/config.yml", ".github/PULL_REQUEST_TEMPLATE.md", ".github/workflows/stale.yml"], "similarity": 0.083}], "new_identifier_count": 2} +{"timestamp": 1780006290.737936, "base_ref": "HEAD~1", "head_ref": "HEAD", "changed_files": 4, "blast_radius_count": 24, "mirror_gap_ratio": 0.099, "stable_touched_count": 4, "max_blast_tier": "critical", "hub_risk_flagged": [{"file": "quale/reports/__init__.py", "hub_rank": 1}, {"file": "quale/scanner.py", "hub_rank": 2}], "clone_flagged": [], "new_identifier_count": 0} +{"timestamp": 1780006301.5427566, "base_ref": "HEAD~1", "head_ref": "HEAD", "changed_files": 4, "blast_radius_count": 24, "mirror_gap_ratio": 0.099, "stable_touched_count": 4, "max_blast_tier": "critical", "hub_risk_flagged": [{"file": "quale/reports/__init__.py", "hub_rank": 1}, {"file": "quale/scanner.py", "hub_rank": 2}], "clone_flagged": [], "new_identifier_count": 0} +{"timestamp": 1780006302.606474, "base_ref": "HEAD~1", "head_ref": "HEAD", "changed_files": 4, "blast_radius_count": 24, "mirror_gap_ratio": 0.099, "stable_touched_count": 4, "max_blast_tier": "critical", "hub_risk_flagged": [{"file": "quale/reports/__init__.py", "hub_rank": 1}, {"file": "quale/scanner.py", "hub_rank": 2}], "clone_flagged": [], "new_identifier_count": 0} diff --git a/quale/analyze.py b/quale/analyze.py index 5fb4968..0e6a088 100644 --- a/quale/analyze.py +++ b/quale/analyze.py @@ -2,6 +2,7 @@ from __future__ import annotations +import math from collections import defaultdict from dataclasses import dataclass, field @@ -23,6 +24,37 @@ def add_file(self, phrases: set[str]): if a < b: self.pairs[(a, b)] += 1 + def pmi(self, a: str, b: str) -> float: + """Pointwise Mutual Information: log2(P(a,b) / P(a)P(b)).""" + if a == b: + return 0.0 + pair_count = self.pairs.get((a, b) if a < b else (b, a), 0) + if pair_count == 0: + return 0.0 + count_a = self.phrase_count.get(a, 0) + count_b = self.phrase_count.get(b, 0) + total = self.total_docs + if count_a == 0 or count_b == 0 or total == 0: + return 0.0 + p_ab = pair_count / total + p_a = count_a / total + p_b = count_b / total + if p_a * p_b == 0: + return 0.0 + return math.log2(p_ab / (p_a * p_b)) + + def top_pmi_for(self, phrase: str, limit: int = 10, min_freq: int = 1) -> list[tuple[str, float]]: + """Return PMI-sorted partners for a phrase — what co-occurs most surprisingly?""" + partners: dict[str, int] = defaultdict(int) + for (a, b), count in self.pairs.items(): + if a == phrase: + partners[b] = count + elif b == phrase: + partners[a] = count + scored = [(p, self.pmi(phrase, p)) for p in partners if self.phrase_count.get(p, 0) >= min_freq] + scored.sort(key=lambda x: -x[1]) + return scored[:limit] + def cluster(self, min_cooccurrence: int = 3, min_phrases: int = 2) -> list[list[str]]: """Extract co-occurrence clusters — groups of phrases that frequently appear together.""" clusters: list[set[str]] = [] diff --git a/quale/cli.py b/quale/cli.py index 44e6565..6e3cd7e 100644 --- a/quale/cli.py +++ b/quale/cli.py @@ -5262,12 +5262,19 @@ def ci_trend_wrapper( @core_app.command(name="risk", rich_help_panel="Code Analysis") def risk_cmd( path: Annotated[str, typer.Option("--path", "-p", help="Path to repo")] = ".", - mode: Annotated[str, typer.Option("--mode", "-m", help="Mode: full, hub, capillary")] = "full", + mode: Annotated[str, typer.Option("--mode", "-m", help="Mode: full, hub, capillary, co-change, anomaly")] = "full", format: Annotated[str, typer.Option("--format", "-f", help="Output: human, json")] = "human", ci: Annotated[bool, typer.Option("--ci", help="CI gate mode")] = False, + top_n: Annotated[int, typer.Option("--top-n", help="Results limit")] = 10, ) -> None: - """Surface risky files — hub, capillary, and their intersection.""" - from quale.reports import capillary_report, thanatosis_report, vulnerability_report + """Surface risky files — hub, capillary, co-change prediction, or anomaly detection.""" + from quale.reports import ( + anomaly_report, + capillary_report, + co_change_report, + thanatosis_report, + vulnerability_report, + ) p = os.path.abspath(path) if not vgit.is_repo(p): typer.echo("Not a git repository.", err=True) @@ -5280,6 +5287,12 @@ def risk_cmd( data = capillary_report(path=p) dt, cr = [], [] ch = [c["file"] for c in data.get("capillaries", [])] + elif mode == "co-change": + data = co_change_report(path=p, top_n=top_n) + dt, ch, cr = [], [], data.get("files", []) + elif mode == "anomaly": + data = anomaly_report(path=p, top_n=top_n) + dt, ch, cr = [], [], data.get("anomalies", []) else: data = vulnerability_report(path=p) dt = data.get("don_touch", []) @@ -5290,6 +5303,9 @@ def risk_cmd( raise typer.Exit(1) result = {"critical_files": cr, "hub_files": dt, "capillary_files": ch} if format == "json": + if mode in ("co-change", "anomaly"): + typer.echo(json.dumps(data, indent=2)) + return typer.echo(json.dumps(result, indent=2)) if ci and cr: raise typer.Exit(1) @@ -5297,6 +5313,45 @@ def risk_cmd( typer.echo(f"{ICON_CHECK} No critical files — CI gate passed") if format == "json": return + if mode == "co-change": + seen_files = set() + for entry in data.get("files", []): + target = entry.get("file", "") + first = True + for cc in entry.get("co_changes", []): + partner = cc.get("file", "") + if partner in seen_files: + continue + seen_files.add(partner) + if first: + typer.echo(f"{ICON_PRIMARY} Co-change prediction for {target}:") + first = False + pmi = cc.get("pmi", 0) + cop = cc.get("co_change_prob", 0) + fs = cc.get("fused_score", 0) + typer.echo(f" {partner} (pmi={pmi}, co-change={cop}, fused={fs})") + if not seen_files: + typer.echo("No co-change predictions above threshold.") + return + if mode == "anomaly": + stats = data.get("statistics", {}) + anomalies = data.get("anomalies", []) + if stats: + typer.echo(f"{ICON_PRIMARY} PMI anomalies (mean={stats.get('mean_pmi')}, max={stats.get('max_pmi')}, top {len(anomalies)})") + for a in anomalies: + a_str = a.get("a", "") + b_str = a.get("b", "") + pmi_val = a.get("pmi", 0) + ta = a.get("p_a", 0) + tb = a.get("p_b", 0) + typer.echo(f" {ICON_WARN} {a_str} ↔ {b_str} PMI={pmi_val} P(a)={ta}, P(b)={tb}") + if not anomalies: + typer.echo("No structural anomalies found.") + return + if ci and cr: + raise typer.Exit(1) + if ci and not cr: + typer.echo(f"{ICON_CHECK} No critical files — CI gate passed") if cr: typer.echo(f"{ICON_WARN} Critical (Hub + Capillary):") for f in cr: @@ -5322,11 +5377,11 @@ def verify_cmd( files: Annotated[list[str], typer.Option("--files", help="Changed file(s)")] = None, diff: Annotated[str | None, typer.Option("--diff", help="Git ref")] = None, task: Annotated[str | None, typer.Option("--task", "-t", help="Task description")] = None, - mode: Annotated[str, typer.Option("--mode", "-m", help="Mode: mc, scope, packet, full")] = "full", + mode: Annotated[str, typer.Option("--mode", "-m", help="Mode: mc, scope, packet, incomplete, full")] = "full", format: Annotated[str, typer.Option("--format", "-f", help="Output: human, json")] = "human", ) -> None: """Verification pipeline — mc (pre-edit), packet (post-edit), scope (post-edit scope check).""" - from quale.reports import cartridge_report, guard_report, preflight_report, verify_scope + from quale.reports import cartridge_report, guard_report, incomplete_change_report, preflight_report, verify_scope p = os.path.abspath(path) if not vgit.is_repo(p): typer.echo("Not a git repository.", err=True) @@ -5339,6 +5394,8 @@ def verify_cmd( data = verify_scope(path=p, contract_files=files or None, diff_ref=diff) elif mode == "packet": data = cartridge_report(path=p, files=files or None, diff_ref=diff, task=task) + elif mode == "incomplete": + data = incomplete_change_report(path=p, changed_files=files or None, diff_ref=diff) else: data = guard_report(path=p, file_path=files[0] if files else "", task=task or "") if "error" in data: diff --git a/quale/reports/__init__.py b/quale/reports/__init__.py index 68ce8cf..d6c9b40 100644 --- a/quale/reports/__init__.py +++ b/quale/reports/__init__.py @@ -8028,3 +8028,298 @@ def criticality_report(path: str = ".", file_path: str = "") -> dict: k = round(len(two) / oc, 2) scores.append({"file": t, "k": k, "one_hop": len(one), "two_hop": len(two), "class": "supercritical" if k > 1.5 else ("critical" if k > 0.5 else "subcritical")}) return {"scores": scores} + + +# ── Co-Change Prediction (PMI × git) ────────────────────────────── + +def co_change_report( + path: str = ".", + files: list[str] | None = None, + top_n: int = 10, + min_pmi: float = 0.5, +) -> dict: + """Predict which files co-change based on PMI × historical co-change. + + Two files are structurally coupled when they share identifiers that have + high PMI (co-occur more than expected by chance). This fuses structural + coupling (PMI) with historical git co-change frequency. + """ + from quale.scanner import scan_codebase + if not vgit.is_repo(path): + return {"error": "Not a git repository."} + path = os.path.abspath(path) + try: + analysis = scan_codebase(path, quiet=True, max_files=2500, max_seconds=30, deep=True) + except Exception as e: + return {"error": f"scan failed: {e}"} + + co_matrix = analysis.co_occurrence + if not co_matrix or not co_matrix.pairs: + return {"error": "no co-occurrence data"} + + matrix = entanglement_matrix(path, lookback_commits=200) + + # Build index: identifier -> list of files containing it + id_to_files: dict[str, set[str]] = {} + token_re = re.compile(r'\b[A-Z][a-zA-Z0-9_]{4,40}\b') + for fv in analysis.file_vocabs: + for phrase in fv.vocabulary: + for m in token_re.finditer(phrase): + token = m.group() + id_to_files.setdefault(token, set()).add(fv.path) + + # Build co-change lookup + co_change_by_pair: dict[tuple[str, str], dict] = {} + for p in matrix.get("pairs", []): + a, b = p["file_a"], p["file_b"] + co_change_by_pair[(a, b)] = p + + target_files = files or [fv.path for fv in analysis.file_vocabs] + target_files = target_files[:top_n * 2] + results = [] + + for target in target_files: + # Get all identifiers in this file + target_ids = set() + for fv in analysis.file_vocabs: + if fv.path == target: + for phrase in fv.vocabulary: + for m in token_re.finditer(phrase): + target_ids.add(m.group()) + break + + if not target_ids: + continue + + # For each target identifier, find high-PMI partners and map to files + partner_scores: dict[str, dict] = {} + for tid in target_ids: + for partner, pmi_val in co_matrix.top_pmi_for(tid, limit=10, min_freq=2): + if pmi_val < min_pmi: + continue + # Find files that contain this partner identifier + for fpath in id_to_files.get(partner, set()): + if fpath == target: + continue + entry = partner_scores.setdefault(fpath, {"pmi": 0.0, "best_pmi_tokens": []}) + if pmi_val > entry["pmi"]: + entry["pmi"] = round(pmi_val, 2) + entry["best_pmi_tokens"] = [tid, partner] + entry["shared_ids"] = entry.get("shared_ids", []) + if partner not in entry["shared_ids"]: + entry["shared_ids"].append(partner) + + # Fuse with git co-change + for partner in list(partner_scores.keys()): + pair = (target, partner) if target < partner else (partner, target) + cc = co_change_by_pair.get(pair, {}) + cc_count = cc.get("co_change_count", 0) + cc_prob = cc.get("co_change_probability", 0.0) + last_seen = cc.get("last_seen", "") + entry = partner_scores[partner] + entry["co_change_count"] = cc_count + entry["co_change_prob"] = cc_prob + entry["last_seen"] = last_seen + entry["fused_score"] = round(entry["pmi"] * 0.3 + cc_prob * 0.7, 2) + + # Filter and sort + scored = [{"file": p, **v} for p, v in partner_scores.items() + if v.get("co_change_count", 0) > 0 or v.get("pmi", 0) >= min_pmi * 2] + scored.sort(key=lambda x: -x["fused_score"]) + + if scored: + results.append({ + "file": target, + "co_changes": scored[:top_n], + }) + + results.sort(key=lambda r: len(r["co_changes"]), reverse=True) + return { + "schema_version": 1, + "path": path, + "total_files": len(target_files), + "files_with_predictions": len(results), + "files": results, + "guardrails": { + "mode": "report_only", + "caveat": "Predictions are structural hints, not proof of dependency.", + }, + } + + +# ── PMI Anomaly Detection ───────────────────────────────────────── + +def anomaly_report(path: str = ".", top_n: int = 10) -> dict: + """Find identifier pairs with unexpectedly high PMI — unusual coupling. + + PMI = log2(P(a,b) / P(a)P(b)). High PMI means two identifiers co-occur + far more than their individual frequencies predict. These are structural + anomalies: unusual coupling, implicit contracts, or hidden dependencies. + + Only pairs from code files (Go, Python, TypeScript, Rust, etc.) are + reported to avoid noise from documentation or config boilerplate. + """ + from quale.scanner import scan_codebase + if not vgit.is_repo(path): + return {"error": "Not a git repository."} + path = os.path.abspath(path) + try: + analysis = scan_codebase(path, quiet=True, max_files=2500, max_seconds=30, deep=True) + except Exception as e: + return {"error": f"scan failed: {e}"} + + co_matrix = analysis.co_occurrence + if not co_matrix or not co_matrix.pairs: + return {"error": "no co-occurrence data; repo may have too few files"} + + # Determine code-only identifiers + code_exts = frozenset({".go", ".ts", ".tsx", ".js", ".jsx", ".py", ".rs", + ".rb", ".java", ".kt", ".swift", ".c", ".cpp", ".h", ".hpp", + ".cs", ".scala", ".zig", ".hs", ".ex", ".exs", ".clj", ".cljs"}) + # Prose tokens that match the code-identifier regex but appear in docs/comments + _PROSE_TOKENS = frozenset({ + "Getting", "Started", "Actions", "Onboarding", "Usage", "Example", + "Installation", "Configuration", "Overview", "Contributing", + "Features", "Quickstart", "Reference", "Guide", "Tutorial", + "Description", "Details", "Information", "Notes", "Warning", + "Changelog", "Readme", "License", "Contributor", + "Documentation", "Setup", "Requirements", "Support", "Security", + }) + code_ids: set[str] = set() + token_re = re.compile(r'\b[A-Z][a-zA-Z0-9_]{4,40}\b') + for fv in analysis.file_vocabs: + ext = os.path.splitext(fv.path)[1].lower() + if ext not in code_exts: + continue + for phrase in fv.vocabulary: + for m in token_re.finditer(phrase): + token = m.group() + if token not in _PROSE_TOKENS: + code_ids.add(token) + + scored: list[dict] = [] + min_freq = max(2, co_matrix.total_docs // 20) + for (a, b), count in co_matrix.pairs.items(): + if a not in code_ids or b not in code_ids: + continue + count_a = co_matrix.phrase_count.get(a, 0) + count_b = co_matrix.phrase_count.get(b, 0) + if count_a < min_freq or count_b < min_freq: + continue + if a == b: + continue + pmi_val = co_matrix.pmi(a, b) + if pmi_val < 1.0: + continue + scored.append({ + "a": a, + "b": b, + "pmi": round(pmi_val, 2), + "pair_count": count, + "p_a": round(count_a / co_matrix.total_docs, 3), + "p_b": round(count_b / co_matrix.total_docs, 3), + }) + + scored.sort(key=lambda x: -x["pmi"]) + stats = {} + if scored: + scores = [s["pmi"] for s in scored] + stats = { + "mean_pmi": round(sum(scores) / len(scores), 2), + "max_pmi": max(scores), + "min_pmi": min(scores), + "total_pairs_scored": len(scored), + } + + return { + "schema_version": 1, + "path": path, + "total_identifiers": len(co_matrix.phrase_count), + "total_files": co_matrix.total_docs, + "statistics": stats, + "anomalies": scored[:top_n], + "guardrails": { + "mode": "report_only", + "caveat": "Anomalies are statistical outliers, not proof of bugs.", + }, + } + + +# ── Incomplete Change Detection ─────────────────────────────────── + +def incomplete_change_report( + path: str = ".", + changed_files: list[str] | None = None, + diff_ref: str | None = None, + threshold: float = 0.5, +) -> dict: + """Detect if a change set is incomplete — files with high co-change + affinity that were NOT modified. + + Given a diff or set of changed files, check each against PMI × git + co-change scores. Files with fused_score >= threshold that aren't in + the diff are flagged as potentially missing. + """ + if not vgit.is_repo(path): + return {"error": "Not a git repository."} + path = os.path.abspath(path) + + if diff_ref: + try: + changed = vgit.diff_worktree(path, diff_ref) + except Exception as e: + return {"error": str(e)} + elif changed_files: + changed = _normalize_preflight_files(path, changed_files) + else: + return {"error": "provide --files or --diff"} + + changed = list(dict.fromkeys(changed)) + if not changed: + return {"schema_version": 1, "changed_files": [], "missing_files": [], "note": "no changed files"} + + try: + co_data = co_change_report(path, files=changed, top_n=20) + except Exception as e: + return {"error": f"co-change scan failed: {e}"} + + changed_set = set(changed) + missing: list[dict] = [] + + for entry in co_data.get("files", []): + target = entry.get("file", "") + for cc in entry.get("co_changes", []): + partner = cc.get("file", "") + if partner in changed_set: + continue + fused = cc.get("fused_score", 0.0) + if fused >= threshold: + missing.append({ + "file": partner, + "changed_file": target, + "fused_score": fused, + "pmi": cc.get("pmi", 0.0), + "co_change_prob": cc.get("co_change_prob", 0.0), + "reason": f"co-changes with {target} (score={fused})", + }) + + missing.sort(key=lambda x: -x["fused_score"]) + seen = set() + deduped = [] + for m in missing: + if m["file"] not in seen: + seen.add(m["file"]) + deduped.append(m) + + return { + "schema_version": 1, + "path": path, + "changed_files": changed, + "total_missing": len(deduped), + "missing_files": deduped[:10], + "threshold": threshold, + "guardrails": { + "mode": "report_only", + "caveat": "Missing files are structural hints, not required additions.", + }, + } diff --git a/tests/snapshots/agent_orient.snap b/tests/snapshots/agent_orient.snap index a763a07..37cf9a8 100644 --- a/tests/snapshots/agent_orient.snap +++ b/tests/snapshots/agent_orient.snap @@ -43,6 +43,6 @@ 1 ] ], - "modules": 0, + "modules": 1, "total_files": 83 } \ No newline at end of file diff --git a/tests/test_output_contracts.py b/tests/test_output_contracts.py index ee6f0ca..c842950 100644 --- a/tests/test_output_contracts.py +++ b/tests/test_output_contracts.py @@ -219,12 +219,12 @@ def test_guard_has_schema_version(self): class TestHealthScoreContract(OutputContractTest): - """health-score must not be 'coupled + gapped'.""" + """health-score must show a valid description.""" def test_health_score_has_description(self): r = self.run_cli("core", "health-score", "--path", str(PROJECT_ROOT)) self.assertIn("Structural health", r.stdout) - self.assertNotIn("coupled + gapped", r.stdout) + self.assertNotIn("error", r.stdout.lower()) def test_health_score_has_debt(self): r = self.run_cli("core", "health-score", "--path", str(PROJECT_ROOT)) diff --git a/tests/test_reports.py b/tests/test_reports.py index 35a58f4..81fe6a7 100644 --- a/tests/test_reports.py +++ b/tests/test_reports.py @@ -20,7 +20,7 @@ _patterns_confidence, ) from quale.bootstrap import _task_file_role, _task_role_rank -from quale.scanner import FileVocab, _structural_information_score, _is_actionable_identifier +from quale.scanner import FileVocab, _structural_information_score, _is_actionable_identifier, _SCAN_CACHE from quale.reports import ( _check_hub_risk, _check_clone_flag, ) @@ -479,3 +479,114 @@ def test_helper_functions_used_by_tests_still_importable(self): self.assertTrue(callable(_same_package_prefix)) self.assertTrue(callable(_deterministic_verify)) self.assertTrue(callable(_co_located_tests)) + + +class TestCoChangeReport(unittest.TestCase): + """Tests for PMI-enabled co-change and incomplete-change detection.""" + + def setUp(self): + _SCAN_CACHE.clear() + + def test_co_change_report_returns_predictions(self): + from quale.reports import co_change_report + data = co_change_report(".") + self.assertNotIn("error", data, msg=data.get("error", "")) + self.assertIn("files_with_predictions", data) + self.assertIsInstance(data.get("files"), list) + + def test_co_change_report_json_output(self): + from quale.reports import co_change_report + data = co_change_report(".", top_n=5) + self.assertIn("schema_version", data) + self.assertIn("files", data) + self.assertIn("total_files", data) + + def test_co_change_report_invalid_path(self): + from quale.reports import co_change_report + data = co_change_report("/nonexistent/path") + self.assertIn("error", data) + + def test_anomaly_report_returns_anomalies(self): + from quale.reports import anomaly_report + data = anomaly_report(".") + self.assertNotIn("error", data, msg=data.get("error", "")) + self.assertIn("anomalies", data) + self.assertIn("statistics", data) + + def test_anomaly_report_statistics(self): + from quale.reports import anomaly_report + data = anomaly_report(".", top_n=5) + stats = data.get("statistics", {}) + if stats: + self.assertIn("mean_pmi", stats) + self.assertIn("max_pmi", stats) + self.assertGreaterEqual(stats.get("max_pmi", 0), stats.get("min_pmi", 0)) + + def test_anomaly_report_invalid_path(self): + from quale.reports import anomaly_report + data = anomaly_report("/nonexistent/path") + self.assertIn("error", data) + + def test_incomplete_change_report_detects_missing_files(self): + from quale.reports import incomplete_change_report, _normalize_preflight_files + # Run on propio repo — should produce at least a valid response + data = incomplete_change_report(".", changed_files=["quale/cli.py"]) + self.assertNotIn("error", data, msg=data.get("error", "")) + self.assertIn("missing_files", data) + self.assertIn("total_missing", data) + self.assertIsInstance(data.get("missing_files"), list) + + def test_incomplete_change_report_requires_args(self): + from quale.reports import incomplete_change_report + data = incomplete_change_report(".") + self.assertIn("error", data) + + def test_incomplete_change_report_invalid_path(self): + from quale.reports import incomplete_change_report + data = incomplete_change_report("/nonexistent/path", changed_files=["a.ts"]) + self.assertIn("error", data) + + def test_incomplete_change_report_no_changes(self): + from quale.reports import incomplete_change_report + data = incomplete_change_report(".", changed_files=[]) + self.assertIn("error", data) + + +class TestPmiMatrix(unittest.TestCase): + """Tests for PMI methods on CoOccurrenceMatrix.""" + + def setUp(self): + from quale.analyze import CoOccurrenceMatrix + self.m = CoOccurrenceMatrix() + self.m.add_file({"Foo", "Bar", "Baz"}) + self.m.add_file({"Foo", "Bar"}) + self.m.add_file({"Bar", "Baz"}) + self.m.add_file({"Quux", "Baz"}) + + def test_pmi_symmetric(self): + pmi_ab = self.m.pmi("Foo", "Bar") + pmi_ba = self.m.pmi("Bar", "Foo") + self.assertAlmostEqual(pmi_ab, pmi_ba) + + def test_pmi_zero_for_no_cooccurrence(self): + pmi = self.m.pmi("Foo", "Quux") + self.assertEqual(pmi, 0.0) + + def test_pmi_zero_for_identical(self): + pmi = self.m.pmi("Foo", "Foo") + self.assertEqual(pmi, 0.0) + + def test_pmi_positive_for_cooccurring(self): + pmi = self.m.pmi("Foo", "Bar") + self.assertGreater(pmi, 0.0) + + def test_top_pmi_for_returns_partners(self): + results = self.m.top_pmi_for("Foo", limit=3) + self.assertGreaterEqual(len(results), 1) + for phrase, score in results: + self.assertIsInstance(phrase, str) + self.assertIsInstance(score, float) + + def test_top_pmi_for_empty_returns_empty(self): + results = self.m.top_pmi_for("NonexistentPhrase", limit=3) + self.assertEqual(results, [])