diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4d61e89 --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ + +.venv/ + +*.pyc + +codra.egg-info/ + +result.json diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..9130c5b --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,23 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Debug codra (-m)", + "type": "python", + "request": "launch", + "cwd": "${workspaceFolder}", + "env": { + "PYTHONPATH": "${workspaceFolder}" + }, + "module": "codra.cli", + "args": [ + "${workspaceFolder}/codra", + "--threshold-csa", "5", + "--threshold-id", "2", + "--threshold-bps", "0.7" + ], + "console": "integratedTerminal", + "justMyCode": false + } + ] +} diff --git a/README.md b/README.md index 8b13789..1e706e6 100644 --- a/README.md +++ b/README.md @@ -1 +1,40 @@ +# Codra +## Requisiti + +- Python 3.10+ +- Node.js 18+ (solo per il viewer) + +## Installazione (core Python) + +Assicurati che il tuo `python3.10` sia disponibile nel PATH. + +```bash +python3.10 -m venv .venv +source .venv/bin/activate +pip install -e . +``` + +## Esecuzione CLI + +Analizza un percorso e stampa il report JSON su stdout: + +```bash +python3 -m codra.cli /percorso/progetto +``` + +Esempio con soglie: + +```bash +python3 -m codra.cli /percorso/progetto --threshold-csa 10 --threshold-id 2 --threshold-bps 0.7 +``` + +## Viewer React (opzionale) + +```bash +cd viewer +npm install +npm run dev +``` + +Carica il file JSON generato dalla CLI tramite il file input dell'interfaccia. diff --git a/codra/__init__.py b/codra/__init__.py new file mode 100644 index 0000000..59736ce --- /dev/null +++ b/codra/__init__.py @@ -0,0 +1,25 @@ +from .bps_analyzer import BpsAnalyzer +from .bps_result import BpsResult +from .csa_analyzer import CsaAnalyzer +from .csa_result import CsaResult +from .directory_scanner import DirectoryScanner +from .indirection_analyzer import IndirectionAnalyzer +from .indirection_result import IndirectionResult +from .report import Report +from .report_builder import ReportBuilder +from .report_serializer import ReportSerializer +from .unit_definition import UnitDefinition + +__all__ = [ + "BpsAnalyzer", + "BpsResult", + "CsaAnalyzer", + "CsaResult", + "DirectoryScanner", + "IndirectionAnalyzer", + "IndirectionResult", + "Report", + "ReportBuilder", + "ReportSerializer", + "UnitDefinition", +] diff --git a/codra/alias_collector.py b/codra/alias_collector.py new file mode 100644 index 0000000..ef23a78 --- /dev/null +++ b/codra/alias_collector.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +import ast +from dataclasses import dataclass, field + + +@dataclass +class AliasCollector(ast.NodeVisitor): + aliases: dict[str, str] = field(default_factory=dict) + depth: int = 0 + + def collect(self, tree: ast.AST) -> dict[str, str]: + self.visit(tree) + return dict(self.aliases) + + def visit_Module(self, node: ast.Module) -> None: + self.depth += 1 + for statement in node.body: + self.visit(statement) + self.depth -= 1 + + def visit_FunctionDef(self, node: ast.FunctionDef) -> None: + return None + + def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None: + return None + + def visit_ClassDef(self, node: ast.ClassDef) -> None: + return None + + def visit_Assign(self, node: ast.Assign) -> None: + if self.depth != 1: + return + if not isinstance(node.value, ast.Name): + return + for target in node.targets: + if isinstance(target, ast.Name): + self.aliases[target.id] = node.value.id + + def visit_AnnAssign(self, node: ast.AnnAssign) -> None: + if self.depth != 1: + return + if not isinstance(node.target, ast.Name): + return + if not isinstance(node.value, ast.Name): + return + self.aliases[node.target.id] = node.value.id diff --git a/codra/bps_analyzer.py b/codra/bps_analyzer.py new file mode 100644 index 0000000..98f720f --- /dev/null +++ b/codra/bps_analyzer.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +import ast +from dataclasses import dataclass + +from .bps_result import BpsResult +from .condition_metrics_collector import ConditionMetricsCollector +from .function_symbol_collector import FunctionSymbolCollector +from .if_collector import IfCollector +from .unit_node_collector import UnitNodeCollector + + +@dataclass +class BpsAnalyzer: + def analyze_file(self, file_path: str) -> list[BpsResult]: + with open(file_path, "r", encoding="utf-8") as handle: + source = handle.read() + tree = ast.parse(source, filename=file_path) + unit_collector = UnitNodeCollector(file_path=file_path) + unit_collector.visit(tree) + results: list[BpsResult] = [] + for unit_node in unit_collector.units: + usage = FunctionSymbolCollector().collect(unit_node.node) + local_names = usage.locals + if_nodes = IfCollector().collect(unit_node.node) + if not if_nodes: + bps = 1.0 + else: + scores: list[float] = [] + for if_node in if_nodes: + metrics = ConditionMetricsCollector(local_names=local_names).collect( + if_node.test + ) + penalty = ( + metrics.bool_ops + + metrics.compare_ops + + 2 * metrics.calls + + 2 * metrics.external_refs + ) + scores.append(1.0 / (1.0 + penalty)) + bps = sum(scores) / len(scores) + results.append(BpsResult(unit=unit_node.definition, bps=bps)) + return results diff --git a/codra/bps_result.py b/codra/bps_result.py new file mode 100644 index 0000000..24cffef --- /dev/null +++ b/codra/bps_result.py @@ -0,0 +1,11 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from .unit_definition import UnitDefinition + + +@dataclass(frozen=True) +class BpsResult: + unit: UnitDefinition + bps: float diff --git a/codra/call_collector.py b/codra/call_collector.py new file mode 100644 index 0000000..5ba57e4 --- /dev/null +++ b/codra/call_collector.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +import ast +from dataclasses import dataclass, field + + +@dataclass +class CallCollector(ast.NodeVisitor): + calls: list[str] = field(default_factory=list) + + def collect(self, node: ast.AST) -> list[str]: + for statement in node.body: + self.visit(statement) + return list(self.calls) + + def visit_FunctionDef(self, node: ast.FunctionDef) -> None: + return None + + def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None: + return None + + def visit_ClassDef(self, node: ast.ClassDef) -> None: + return None + + def visit_Call(self, node: ast.Call) -> None: + if isinstance(node.func, ast.Name): + self.calls.append(node.func.id) + self.generic_visit(node) diff --git a/codra/cli.py b/codra/cli.py new file mode 100644 index 0000000..1c2efae --- /dev/null +++ b/codra/cli.py @@ -0,0 +1,75 @@ +from __future__ import annotations + +import argparse +import sys +from dataclasses import dataclass + +from .bps_analyzer import BpsAnalyzer +from .csa_analyzer import CsaAnalyzer +from .file_path_collector import FilePathCollector +from .indirection_analyzer import IndirectionAnalyzer +from .report_builder import ReportBuilder +from .report_serializer import ReportSerializer +from .threshold_config import ThresholdConfig + + +@dataclass(frozen=True) +class CliArgs: + path: str + threshold_csa: int | None + threshold_id: int | None + threshold_bps: float | None + + +@dataclass(frozen=True) +class CliDependencies: + builder: ReportBuilder + serializer: ReportSerializer + + +def parse_args(argv: list[str]) -> CliArgs: + parser = argparse.ArgumentParser() + parser.add_argument("path") + parser.add_argument("--threshold-csa", type=int, default=None) + parser.add_argument("--threshold-id", type=int, default=None) + parser.add_argument("--threshold-bps", type=float, default=None) + o = parser.parse_args(argv) + return CliArgs( + path=o.path, + threshold_csa=o.threshold_csa, + threshold_id=o.threshold_id, + threshold_bps=o.threshold_bps, + ) + + +def build_thresholds(args: CliArgs) -> ThresholdConfig: + return ThresholdConfig( + csa=args.threshold_csa, + indirection=args.threshold_id, + bps=args.threshold_bps, + ) + + +def build_dependencies() -> CliDependencies: + builder = ReportBuilder( + csa_analyzer=CsaAnalyzer(), + indirection_analyzer=IndirectionAnalyzer(), + bps_analyzer=BpsAnalyzer(), + path_collector=FilePathCollector(), + ) + return CliDependencies(builder=builder, serializer=ReportSerializer()) + + +def run_cli(args: CliArgs, deps: CliDependencies) -> int: + report = deps.builder.build(args.path) + sys.stdout.write(deps.serializer.to_json(report)) + sys.stdout.write("\n") + return 0 if deps.builder.check_thresholds(report, build_thresholds(args)) else 1 + + +def main() -> None: + raise SystemExit(run_cli(parse_args(sys.argv[1:]), build_dependencies())) + + +if __name__ == "__main__": + main() diff --git a/codra/condition_metrics.py b/codra/condition_metrics.py new file mode 100644 index 0000000..bcb3896 --- /dev/null +++ b/codra/condition_metrics.py @@ -0,0 +1,11 @@ +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class ConditionMetrics: + bool_ops: int + compare_ops: int + calls: int + external_refs: int diff --git a/codra/condition_metrics_collector.py b/codra/condition_metrics_collector.py new file mode 100644 index 0000000..a2bcc0a --- /dev/null +++ b/codra/condition_metrics_collector.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +import ast +from dataclasses import dataclass + +from .condition_metrics import ConditionMetrics + + +@dataclass +class ConditionMetricsCollector(ast.NodeVisitor): + local_names: set[str] + bool_ops: int = 0 + compare_ops: int = 0 + calls: int = 0 + external_refs: int = 0 + + def collect(self, node: ast.AST) -> ConditionMetrics: + self.visit(node) + return ConditionMetrics( + bool_ops=self.bool_ops, + compare_ops=self.compare_ops, + calls=self.calls, + external_refs=self.external_refs, + ) + + def visit_BoolOp(self, node: ast.BoolOp) -> None: + self.bool_ops += max(0, len(node.values) - 1) + self.generic_visit(node) + + def visit_Compare(self, node: ast.Compare) -> None: + self.compare_ops += len(node.ops) + self.generic_visit(node) + + def visit_Call(self, node: ast.Call) -> None: + self.calls += 1 + self.generic_visit(node) + + def visit_Name(self, node: ast.Name) -> None: + if isinstance(node.ctx, ast.Load) and node.id not in self.local_names: + self.external_refs += 1 diff --git a/codra/csa_analyzer.py b/codra/csa_analyzer.py new file mode 100644 index 0000000..0ed94a2 --- /dev/null +++ b/codra/csa_analyzer.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +import ast +from dataclasses import dataclass + +from .csa_result import CsaResult +from .function_symbol_collector import FunctionSymbolCollector +from .module_symbol_collector import ModuleSymbolCollector +from .unit_node_collector import UnitNodeCollector + + +@dataclass +class CsaAnalyzer: + def analyze_file(self, file_path: str) -> list[CsaResult]: + with open(file_path, "r", encoding="utf-8") as handle: + source = handle.read() + tree = ast.parse(source, filename=file_path) + module_symbols = ModuleSymbolCollector().collect(tree) + unit_collector = UnitNodeCollector(file_path=file_path) + unit_collector.visit(tree) + results: list[CsaResult] = [] + for unit_node in unit_collector.units: + usage = FunctionSymbolCollector().collect(unit_node.node) + used_names = usage.used_names + local_names = usage.locals + global_symbols = used_names.intersection(module_symbols) - local_names + free_symbols = used_names - local_names - module_symbols + external_symbols = global_symbols.union(free_symbols) + results.append( + CsaResult( + unit=unit_node.definition, + csa_main=len(external_symbols), + external_symbols=sorted(external_symbols), + self_fields_read=sorted(usage.self_fields_read), + ) + ) + return results diff --git a/codra/csa_result.py b/codra/csa_result.py new file mode 100644 index 0000000..2e600e8 --- /dev/null +++ b/codra/csa_result.py @@ -0,0 +1,13 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from .unit_definition import UnitDefinition + + +@dataclass(frozen=True) +class CsaResult: + unit: UnitDefinition + csa_main: int + external_symbols: list[str] + self_fields_read: list[str] diff --git a/codra/directory_scanner.py b/codra/directory_scanner.py new file mode 100644 index 0000000..9b9413b --- /dev/null +++ b/codra/directory_scanner.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +import os +from dataclasses import dataclass, field + +from .python_file_scanner import PythonFileScanner +from .unit_definition import UnitDefinition + + +@dataclass +class DirectoryScanner: + file_scanner: PythonFileScanner = field(default_factory=PythonFileScanner) + + def scan(self, root_path: str) -> list[UnitDefinition]: + units: list[UnitDefinition] = [] + for current_root, dirnames, filenames in os.walk(root_path): + dirnames.sort() + filenames.sort() + for filename in filenames: + if not filename.endswith(".py"): + continue + file_path = os.path.join(current_root, filename) + units.extend(self.file_scanner.scan_file(file_path)) + return units diff --git a/codra/file_path_collector.py b/codra/file_path_collector.py new file mode 100644 index 0000000..fae4211 --- /dev/null +++ b/codra/file_path_collector.py @@ -0,0 +1,17 @@ +from __future__ import annotations + +import os +from dataclasses import dataclass + + +@dataclass +class FilePathCollector: + def collect(self, root_path: str) -> list[str]: + paths: list[str] = [] + for current_root, dirnames, filenames in os.walk(root_path): + dirnames.sort() + filenames.sort() + for filename in filenames: + if filename.endswith(".py"): + paths.append(os.path.join(current_root, filename)) + return paths diff --git a/codra/file_report.py b/codra/file_report.py new file mode 100644 index 0000000..fc6717c --- /dev/null +++ b/codra/file_report.py @@ -0,0 +1,11 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from .unit_report import UnitReport + + +@dataclass(frozen=True) +class FileReport: + file_path: str + units: list[UnitReport] diff --git a/codra/function_definition_collector.py b/codra/function_definition_collector.py new file mode 100644 index 0000000..875f066 --- /dev/null +++ b/codra/function_definition_collector.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +import ast +from dataclasses import dataclass, field + + +@dataclass +class FunctionDefinitionCollector(ast.NodeVisitor): + names: set[str] = field(default_factory=set) + class_stack: list[str] = field(default_factory=list) + + def collect(self, tree: ast.AST) -> set[str]: + self.visit(tree) + return set(self.names) + + def visit_ClassDef(self, node: ast.ClassDef) -> None: + self.class_stack.append(node.name) + for statement in node.body: + self.visit(statement) + self.class_stack.pop() + + def visit_FunctionDef(self, node: ast.FunctionDef) -> None: + if not self.class_stack: + self.names.add(node.name) + for statement in node.body: + self.visit(statement) + + def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None: + if not self.class_stack: + self.names.add(node.name) + for statement in node.body: + self.visit(statement) diff --git a/codra/function_extractor.py b/codra/function_extractor.py new file mode 100644 index 0000000..5ebf5df --- /dev/null +++ b/codra/function_extractor.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +import ast +from dataclasses import dataclass, field + +from .unit_definition import UnitDefinition + + +@dataclass +class FunctionExtractor(ast.NodeVisitor): + file_path: str + units: list[UnitDefinition] = field(default_factory=list) + class_stack: list[str] = field(default_factory=list) + function_stack: list[str] = field(default_factory=list) + + def visit_ClassDef(self, node: ast.ClassDef) -> None: + self.class_stack.append(node.name) + self.generic_visit(node) + self.class_stack.pop() + + def visit_FunctionDef(self, node: ast.FunctionDef) -> None: + self._handle_function(node) + + def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None: + self._handle_function(node) + + def _handle_function(self, node: ast.AST) -> None: + name = node.name + is_direct_method = bool(self.class_stack) and not self.function_stack + if is_direct_method: + qualified_id = f"{self.class_stack[-1]}.{name}" + kind = "method" + else: + parts = [] + if self.class_stack: + parts.append(self.class_stack[-1]) + if self.function_stack: + parts.extend(self.function_stack) + parts.append(name) + qualified_id = ".".join(parts) + kind = "function" + start_line = getattr(node, "lineno", 0) or 0 + end_line = getattr(node, "end_lineno", 0) or start_line + self.units.append( + UnitDefinition( + file_path=self.file_path, + qualified_id=qualified_id, + kind=kind, + start_line=start_line, + end_line=end_line, + ) + ) + self.function_stack.append(name) + self.generic_visit(node) + self.function_stack.pop() diff --git a/codra/function_symbol_collector.py b/codra/function_symbol_collector.py new file mode 100644 index 0000000..f765130 --- /dev/null +++ b/codra/function_symbol_collector.py @@ -0,0 +1,69 @@ +from __future__ import annotations + +import ast +from dataclasses import dataclass, field + +from .function_symbol_usage import FunctionSymbolUsage + + +@dataclass +class FunctionSymbolCollector(ast.NodeVisitor): + usage: FunctionSymbolUsage = field(default_factory=FunctionSymbolUsage) + + def collect(self, node: ast.AST) -> FunctionSymbolUsage: + self._add_arguments(node) + for statement in node.body: + self.visit(statement) + return self.usage + + def visit_FunctionDef(self, node: ast.FunctionDef) -> None: + return None + + def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None: + return None + + def visit_ClassDef(self, node: ast.ClassDef) -> None: + return None + + def visit_Global(self, node: ast.Global) -> None: + self.usage.global_decls.update(node.names) + + def visit_Nonlocal(self, node: ast.Nonlocal) -> None: + self.usage.nonlocal_decls.update(node.names) + + def visit_Name(self, node: ast.Name) -> None: + if isinstance(node.ctx, ast.Load): + self.usage.used_names.add(node.id) + elif isinstance(node.ctx, (ast.Store, ast.Del)): + if node.id in self.usage.global_decls: + return + if node.id in self.usage.nonlocal_decls: + return + self.usage.locals.add(node.id) + + def visit_Attribute(self, node: ast.Attribute) -> None: + if isinstance(node.ctx, ast.Load): + if isinstance(node.value, ast.Name) and node.value.id == "self": + self.usage.self_fields_read.add(node.attr) + self.generic_visit(node) + + def visit_ExceptHandler(self, node: ast.ExceptHandler) -> None: + if isinstance(node.name, str): + self.usage.locals.add(node.name) + elif node.name is not None: + self.visit(node.name) + for statement in node.body: + self.visit(statement) + + def _add_arguments(self, node: ast.AST) -> None: + arguments = node.args + for arg in arguments.posonlyargs: + self.usage.locals.add(arg.arg) + for arg in arguments.args: + self.usage.locals.add(arg.arg) + for arg in arguments.kwonlyargs: + self.usage.locals.add(arg.arg) + if arguments.vararg is not None: + self.usage.locals.add(arguments.vararg.arg) + if arguments.kwarg is not None: + self.usage.locals.add(arguments.kwarg.arg) diff --git a/codra/function_symbol_usage.py b/codra/function_symbol_usage.py new file mode 100644 index 0000000..334d660 --- /dev/null +++ b/codra/function_symbol_usage.py @@ -0,0 +1,12 @@ +from __future__ import annotations + +from dataclasses import dataclass, field + + +@dataclass +class FunctionSymbolUsage: + locals: set[str] = field(default_factory=set) + used_names: set[str] = field(default_factory=set) + self_fields_read: set[str] = field(default_factory=set) + global_decls: set[str] = field(default_factory=set) + nonlocal_decls: set[str] = field(default_factory=set) diff --git a/codra/if_collector.py b/codra/if_collector.py new file mode 100644 index 0000000..629bc31 --- /dev/null +++ b/codra/if_collector.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +import ast +from dataclasses import dataclass, field + + +@dataclass +class IfCollector(ast.NodeVisitor): + nodes: list[ast.If] = field(default_factory=list) + + def collect(self, node: ast.AST) -> list[ast.If]: + for statement in node.body: + self.visit(statement) + return list(self.nodes) + + def visit_FunctionDef(self, node: ast.FunctionDef) -> None: + return None + + def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None: + return None + + def visit_ClassDef(self, node: ast.ClassDef) -> None: + return None + + def visit_If(self, node: ast.If) -> None: + self.nodes.append(node) + self.generic_visit(node) diff --git a/codra/indirection_analyzer.py b/codra/indirection_analyzer.py new file mode 100644 index 0000000..66974b0 --- /dev/null +++ b/codra/indirection_analyzer.py @@ -0,0 +1,102 @@ +from __future__ import annotations + +import ast +from dataclasses import dataclass + +from .alias_collector import AliasCollector +from .call_collector import CallCollector +from .function_definition_collector import FunctionDefinitionCollector +from .indirection_result import IndirectionResult +from .unit_node_collector import UnitNodeCollector + + +@dataclass +class IndirectionAnalyzer: + def analyze_file(self, file_path: str) -> list[IndirectionResult]: + with open(file_path, "r", encoding="utf-8") as handle: + source = handle.read() + tree = ast.parse(source, filename=file_path) + function_names = FunctionDefinitionCollector().collect(tree) + aliases = AliasCollector().collect(tree) + call_graph = self._build_call_graph(tree, function_names, aliases) + depth_cache: dict[str, int] = {} + results: list[IndirectionResult] = [] + unit_collector = UnitNodeCollector(file_path=file_path) + unit_collector.visit(tree) + for unit_node in unit_collector.units: + call_names = CallCollector().collect(unit_node.node) + resolved_calls: list[str] = [] + unresolved_calls: set[str] = set() + for name in call_names: + resolved = self._resolve_alias(name, aliases) + if resolved in function_names: + resolved_calls.append(resolved) + else: + unresolved_calls.add(name) + call_depths = [ + 1 + self._depth(call_name, call_graph, depth_cache) + for call_name in resolved_calls + ] + if call_depths: + id_max = max(call_depths) + id_avg = sum(call_depths) / len(call_depths) + else: + id_max = 0 + id_avg = 0.0 + results.append( + IndirectionResult( + unit=unit_node.definition, + id_max=id_max, + id_avg=id_avg, + unresolved_calls=sorted(unresolved_calls), + ) + ) + return results + + def _build_call_graph( + self, + tree: ast.AST, + function_names: set[str], + aliases: dict[str, str], + ) -> dict[str, list[str]]: + call_graph: dict[str, list[str]] = {name: [] for name in function_names} + for node in tree.body: + if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): + if node.name in function_names: + call_names = CallCollector().collect(node) + for name in call_names: + resolved = self._resolve_alias(name, aliases) + if resolved in function_names: + call_graph[node.name].append(resolved) + return call_graph + + def _resolve_alias(self, name: str, aliases: dict[str, str]) -> str: + seen: set[str] = set() + current = name + while current in aliases and current not in seen: + seen.add(current) + current = aliases[current] + return current + + def _depth( + self, + name: str, + call_graph: dict[str, list[str]], + depth_cache: dict[str, int], + ) -> int: + if name in depth_cache: + return depth_cache[name] + if name not in call_graph: + depth_cache[name] = 0 + return 0 + if not call_graph[name]: + depth_cache[name] = 0 + return 0 + depth_cache[name] = -1 + depth = 0 + for callee in call_graph[name]: + if depth_cache.get(callee) == -1: + continue + depth = max(depth, 1 + self._depth(callee, call_graph, depth_cache)) + depth_cache[name] = depth + return depth diff --git a/codra/indirection_result.py b/codra/indirection_result.py new file mode 100644 index 0000000..5deaf47 --- /dev/null +++ b/codra/indirection_result.py @@ -0,0 +1,13 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from .unit_definition import UnitDefinition + + +@dataclass(frozen=True) +class IndirectionResult: + unit: UnitDefinition + id_max: int + id_avg: float + unresolved_calls: list[str] diff --git a/codra/module_symbol_collector.py b/codra/module_symbol_collector.py new file mode 100644 index 0000000..5133a9f --- /dev/null +++ b/codra/module_symbol_collector.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +import ast +from dataclasses import dataclass, field + + +@dataclass +class ModuleSymbolCollector(ast.NodeVisitor): + symbols: set[str] = field(default_factory=set) + + def collect(self, tree: ast.AST) -> set[str]: + self.visit(tree) + return set(self.symbols) + + def visit_FunctionDef(self, node: ast.FunctionDef) -> None: + self.symbols.add(node.name) + + def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None: + self.symbols.add(node.name) + + def visit_ClassDef(self, node: ast.ClassDef) -> None: + self.symbols.add(node.name) + + def visit_Import(self, node: ast.Import) -> None: + for alias in node.names: + name = alias.asname or alias.name.split(".", 1)[0] + self.symbols.add(name) + + def visit_ImportFrom(self, node: ast.ImportFrom) -> None: + for alias in node.names: + name = alias.asname or alias.name + self.symbols.add(name) + + def visit_Assign(self, node: ast.Assign) -> None: + for target in node.targets: + self.symbols.update(self._extract_target_names(target)) + self.generic_visit(node) + + def visit_AnnAssign(self, node: ast.AnnAssign) -> None: + self.symbols.update(self._extract_target_names(node.target)) + self.generic_visit(node) + + def visit_AugAssign(self, node: ast.AugAssign) -> None: + self.symbols.update(self._extract_target_names(node.target)) + self.generic_visit(node) + + def visit_For(self, node: ast.For) -> None: + self.symbols.update(self._extract_target_names(node.target)) + self.generic_visit(node) + + def visit_AsyncFor(self, node: ast.AsyncFor) -> None: + self.symbols.update(self._extract_target_names(node.target)) + self.generic_visit(node) + + def visit_With(self, node: ast.With) -> None: + for item in node.items: + if item.optional_vars is not None: + self.symbols.update(self._extract_target_names(item.optional_vars)) + self.generic_visit(node) + + def visit_AsyncWith(self, node: ast.AsyncWith) -> None: + for item in node.items: + if item.optional_vars is not None: + self.symbols.update(self._extract_target_names(item.optional_vars)) + self.generic_visit(node) + + def visit_ExceptHandler(self, node: ast.ExceptHandler) -> None: + if isinstance(node.name, str): + self.symbols.add(node.name) + elif node.name is not None: + self.symbols.update(self._extract_target_names(node.name)) + self.generic_visit(node) + + def _extract_target_names(self, node: ast.AST) -> set[str]: + names: set[str] = set() + for target in ast.walk(node): + if isinstance(target, ast.Name): + names.add(target.id) + return names diff --git a/codra/python_file_scanner.py b/codra/python_file_scanner.py new file mode 100644 index 0000000..a836078 --- /dev/null +++ b/codra/python_file_scanner.py @@ -0,0 +1,18 @@ +from __future__ import annotations + +import ast +from dataclasses import dataclass + +from .function_extractor import FunctionExtractor +from .unit_definition import UnitDefinition + + +@dataclass +class PythonFileScanner: + def scan_file(self, file_path: str) -> list[UnitDefinition]: + with open(file_path, "r", encoding="utf-8") as handle: + source = handle.read() + tree = ast.parse(source, filename=file_path) + extractor = FunctionExtractor(file_path=file_path) + extractor.visit(tree) + return extractor.units diff --git a/codra/report.py b/codra/report.py new file mode 100644 index 0000000..d19d2a1 --- /dev/null +++ b/codra/report.py @@ -0,0 +1,14 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from .file_report import FileReport +from .report_summary import ReportSummary + + +@dataclass(frozen=True) +class Report: + schema_version: str + language: str + summary: ReportSummary + files: list[FileReport] diff --git a/codra/report_builder.py b/codra/report_builder.py new file mode 100644 index 0000000..d3f1832 --- /dev/null +++ b/codra/report_builder.py @@ -0,0 +1,123 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from .bps_analyzer import BpsAnalyzer +from .csa_analyzer import CsaAnalyzer +from .file_path_collector import FilePathCollector +from .file_report import FileReport +from .indirection_analyzer import IndirectionAnalyzer +from .report import Report +from .report_summary import ReportSummary +from .threshold_config import ThresholdConfig +from .unit_definition import UnitDefinition +from .unit_key import UnitKey +from .unit_metrics import UnitMetrics +from .unit_report import UnitReport + + +@dataclass +class ReportBuilder: + csa_analyzer: CsaAnalyzer + indirection_analyzer: IndirectionAnalyzer + bps_analyzer: BpsAnalyzer + path_collector: FilePathCollector + + def build(self, root_path: str) -> Report: + file_paths = self.path_collector.collect(root_path) + files: list[FileReport] = [] + total_units = 0 + for file_path in file_paths: + csa_results = self.csa_analyzer.analyze_file(file_path) + indirection_results = self.indirection_analyzer.analyze_file(file_path) + bps_results = self.bps_analyzer.analyze_file(file_path) + csa_map = {self._key(result.unit): result for result in csa_results} + indirection_map = { + self._key(result.unit): result for result in indirection_results + } + bps_map = {self._key(result.unit): result for result in bps_results} + keys = sorted( + {**csa_map, **indirection_map, **bps_map}.keys(), + key=self._sort_key, + ) + units: list[UnitReport] = [] + for key in keys: + unit = self._resolve_unit(key, csa_map, indirection_map, bps_map) + metrics = self._resolve_metrics(key, csa_map, indirection_map, bps_map) + units.append(UnitReport(unit=unit, metrics=metrics)) + files.append(FileReport(file_path=file_path, units=units)) + total_units += len(units) + summary = ReportSummary(total_files=len(files), total_units=total_units) + return Report( + schema_version="1.0", + language="python", + summary=summary, + files=files, + ) + + def check_thresholds(self, report: Report, thresholds: ThresholdConfig) -> bool: + for file_report in report.files: + for unit in file_report.units: + metrics = unit.metrics + if thresholds.csa is not None and metrics.csa_main > thresholds.csa: + return False + if ( + thresholds.indirection is not None + and metrics.id_max > thresholds.indirection + ): + return False + if thresholds.bps is not None and metrics.bps < thresholds.bps: + return False + return True + + def _key(self, unit: UnitDefinition) -> UnitKey: + return UnitKey( + file_path=unit.file_path, + qualified_id=unit.qualified_id, + kind=unit.kind, + start_line=unit.start_line, + end_line=unit.end_line, + ) + + def _sort_key(self, key: UnitKey) -> tuple[str, int, int, str]: + return (key.qualified_id, key.start_line, key.end_line, key.kind) + + def _resolve_unit( + self, + key: UnitKey, + csa_map: dict[UnitKey, object], + indirection_map: dict[UnitKey, object], + bps_map: dict[UnitKey, object], + ) -> UnitDefinition: + for mapping in (csa_map, indirection_map, bps_map): + if key in mapping: + return mapping[key].unit + return UnitDefinition( + file_path=key.file_path, + qualified_id=key.qualified_id, + kind=key.kind, + start_line=key.start_line, + end_line=key.end_line, + ) + + def _resolve_metrics( + self, + key: UnitKey, + csa_map: dict[UnitKey, object], + indirection_map: dict[UnitKey, object], + bps_map: dict[UnitKey, object], + ) -> UnitMetrics: + csa_result = csa_map.get(key) + indirection_result = indirection_map.get(key) + bps_result = bps_map.get(key) + return UnitMetrics( + csa_main=csa_result.csa_main if csa_result else 0, + external_symbols=csa_result.external_symbols if csa_result else [], + self_fields_read=csa_result.self_fields_read if csa_result else [], + id_max=indirection_result.id_max if indirection_result else 0, + id_avg=indirection_result.id_avg if indirection_result else 0.0, + unresolved_calls=( + indirection_result.unresolved_calls if indirection_result else [] + ), + bps=bps_result.bps if bps_result else 1.0, + ) diff --git a/codra/report_serializer.py b/codra/report_serializer.py new file mode 100644 index 0000000..59d8673 --- /dev/null +++ b/codra/report_serializer.py @@ -0,0 +1,13 @@ +from __future__ import annotations + +import json +from dataclasses import asdict +from dataclasses import dataclass + +from .report import Report + + +@dataclass +class ReportSerializer: + def to_json(self, report: Report) -> str: + return json.dumps(asdict(report), sort_keys=True) diff --git a/codra/report_summary.py b/codra/report_summary.py new file mode 100644 index 0000000..63ccd3d --- /dev/null +++ b/codra/report_summary.py @@ -0,0 +1,9 @@ +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class ReportSummary: + total_files: int + total_units: int diff --git a/codra/threshold_config.py b/codra/threshold_config.py new file mode 100644 index 0000000..fd2f11e --- /dev/null +++ b/codra/threshold_config.py @@ -0,0 +1,10 @@ +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class ThresholdConfig: + csa: int | None + indirection: int | None + bps: float | None diff --git a/codra/unit_definition.py b/codra/unit_definition.py new file mode 100644 index 0000000..aabff3b --- /dev/null +++ b/codra/unit_definition.py @@ -0,0 +1,10 @@ +from dataclasses import dataclass + + +@dataclass(frozen=True) +class UnitDefinition: + file_path: str + qualified_id: str + kind: str + start_line: int + end_line: int diff --git a/codra/unit_key.py b/codra/unit_key.py new file mode 100644 index 0000000..d794e1d --- /dev/null +++ b/codra/unit_key.py @@ -0,0 +1,12 @@ +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class UnitKey: + file_path: str + qualified_id: str + kind: str + start_line: int + end_line: int diff --git a/codra/unit_metrics.py b/codra/unit_metrics.py new file mode 100644 index 0000000..6eddc17 --- /dev/null +++ b/codra/unit_metrics.py @@ -0,0 +1,14 @@ +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class UnitMetrics: + csa_main: int + external_symbols: list[str] + self_fields_read: list[str] + id_max: int + id_avg: float + unresolved_calls: list[str] + bps: float diff --git a/codra/unit_node.py b/codra/unit_node.py new file mode 100644 index 0000000..0c38fef --- /dev/null +++ b/codra/unit_node.py @@ -0,0 +1,12 @@ +from __future__ import annotations + +import ast +from dataclasses import dataclass + +from .unit_definition import UnitDefinition + + +@dataclass(frozen=True) +class UnitNode: + definition: UnitDefinition + node: ast.AST diff --git a/codra/unit_node_collector.py b/codra/unit_node_collector.py new file mode 100644 index 0000000..56e1b64 --- /dev/null +++ b/codra/unit_node_collector.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +import ast +from dataclasses import dataclass, field + +from .unit_definition import UnitDefinition +from .unit_node import UnitNode + + +@dataclass +class UnitNodeCollector(ast.NodeVisitor): + file_path: str + units: list[UnitNode] = field(default_factory=list) + class_stack: list[str] = field(default_factory=list) + function_stack: list[str] = field(default_factory=list) + + def visit_ClassDef(self, node: ast.ClassDef) -> None: + self.class_stack.append(node.name) + self.generic_visit(node) + self.class_stack.pop() + + def visit_FunctionDef(self, node: ast.FunctionDef) -> None: + self._handle_function(node) + + def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None: + self._handle_function(node) + + def _handle_function(self, node: ast.AST) -> None: + name = node.name + is_direct_method = bool(self.class_stack) and not self.function_stack + if is_direct_method: + qualified_id = f"{self.class_stack[-1]}.{name}" + kind = "method" + else: + parts = [] + if self.class_stack: + parts.append(self.class_stack[-1]) + if self.function_stack: + parts.extend(self.function_stack) + parts.append(name) + qualified_id = ".".join(parts) + kind = "function" + start_line = getattr(node, "lineno", 0) or 0 + end_line = getattr(node, "end_lineno", 0) or start_line + definition = UnitDefinition( + file_path=self.file_path, + qualified_id=qualified_id, + kind=kind, + start_line=start_line, + end_line=end_line, + ) + self.units.append(UnitNode(definition=definition, node=node)) + self.function_stack.append(name) + self.generic_visit(node) + self.function_stack.pop() diff --git a/codra/unit_report.py b/codra/unit_report.py new file mode 100644 index 0000000..208e656 --- /dev/null +++ b/codra/unit_report.py @@ -0,0 +1,12 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from .unit_definition import UnitDefinition +from .unit_metrics import UnitMetrics + + +@dataclass(frozen=True) +class UnitReport: + unit: UnitDefinition + metrics: UnitMetrics diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..9e64d0b --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,12 @@ +[build-system] +requires = ["setuptools>=61.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "codra" +version = "0.1.0" +description = "AST-based static analysis tool" +requires-python = ">=3.10" + +[tool.setuptools] +packages = ["codra"] diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..24fdc12 --- /dev/null +++ b/setup.py @@ -0,0 +1,4 @@ +from setuptools import find_packages +from setuptools import setup + +setup(packages=find_packages()) diff --git a/viewer/index.html b/viewer/index.html new file mode 100644 index 0000000..89553b8 --- /dev/null +++ b/viewer/index.html @@ -0,0 +1,12 @@ + + + + + + Codra Report Viewer + + +
+ + + diff --git a/viewer/package.json b/viewer/package.json new file mode 100644 index 0000000..7d7be41 --- /dev/null +++ b/viewer/package.json @@ -0,0 +1,22 @@ +{ + "name": "codra-viewer", + "version": "0.1.0", + "private": true, + "type": "module", + "scripts": { + "dev": "vite", + "build": "vite build", + "preview": "vite preview" + }, + "dependencies": { + "react": "^18.2.0", + "react-dom": "^18.2.0" + }, + "devDependencies": { + "@types/react": "^18.2.21", + "@types/react-dom": "^18.2.7", + "@vitejs/plugin-react": "^4.2.1", + "typescript": "^5.3.3", + "vite": "^5.0.8" + } +} diff --git a/viewer/src/App.tsx b/viewer/src/App.tsx new file mode 100644 index 0000000..c60d7c3 --- /dev/null +++ b/viewer/src/App.tsx @@ -0,0 +1,84 @@ +import { useState } from "react" +import type { Report } from "./report_types" +import { ReportTable } from "./report_table" + +export function App() { + const [report, setReport] = useState(null) + const [error, setError] = useState(null) + const [filterText, setFilterText] = useState("") + + const handleFile = (event: React.ChangeEvent) => { + const file = event.target.files?.[0] + if (!file) { + return + } + const reader = new FileReader() + reader.onload = () => { + try { + const parsed = JSON.parse(String(reader.result)) as Report + setReport(parsed) + setError(null) + } catch (err) { + setReport(null) + setError("Invalid JSON report") + } + } + reader.onerror = () => { + setReport(null) + setError("Failed to read file") + } + reader.readAsText(file) + } + + return ( +
+
+

Codra Report Viewer

+

Load a JSON report to explore CSA, ID, and BPS metrics.

+
+
+ + +
+ {error ?
{error}
: null} + {report ? ( +
+
+
+ Schema + {report.schema_version} +
+
+ Language + {report.language} +
+
+ Files + {report.summary.total_files} +
+
+ Units + {report.summary.total_units} +
+
+ +
+ ) : ( +
+

No report loaded.

+
+ )} +
+ ) +} diff --git a/viewer/src/main.tsx b/viewer/src/main.tsx new file mode 100644 index 0000000..7934547 --- /dev/null +++ b/viewer/src/main.tsx @@ -0,0 +1,10 @@ +import { createRoot } from "react-dom/client" +import { App } from "./App" +import "./styles.css" + +const container = document.getElementById("root") +if (!container) { + throw new Error("Root element not found") +} + +createRoot(container).render() diff --git a/viewer/src/report_table.tsx b/viewer/src/report_table.tsx new file mode 100644 index 0000000..907ab64 --- /dev/null +++ b/viewer/src/report_table.tsx @@ -0,0 +1,130 @@ +import { useMemo, useState } from "react" +import type { FileReport, UnitReport } from "./report_types" + +export type SortKey = "file" | "unit" | "kind" | "csa" | "id" | "bps" + +type ReportTableProps = { + files: FileReport[] + filterText: string +} + +type Row = { + file: string + unit: string + kind: string + csa: number + idMax: number + bps: number + raw: UnitReport +} + +export function ReportTable({ files, filterText }: ReportTableProps) { + const [sortKey, setSortKey] = useState("file") + const [direction, setDirection] = useState<"asc" | "desc">("asc") + + const rows = useMemo(() => { + const entries: Row[] = [] + for (const file of files) { + for (const unit of file.units) { + entries.push({ + file: file.file_path, + unit: unit.unit.qualified_id, + kind: unit.unit.kind, + csa: unit.metrics.csa_main, + idMax: unit.metrics.id_max, + bps: unit.metrics.bps, + raw: unit + }) + } + } + const needle = filterText.trim().toLowerCase() + const filtered = needle + ? entries.filter((row) => + [row.file, row.unit, row.kind] + .join(" ") + .toLowerCase() + .includes(needle) + ) + : entries + const sorted = [...filtered].sort((left, right) => { + const factor = direction === "asc" ? 1 : -1 + if (sortKey === "file") { + return left.file.localeCompare(right.file) * factor + } + if (sortKey === "unit") { + return left.unit.localeCompare(right.unit) * factor + } + if (sortKey === "kind") { + return left.kind.localeCompare(right.kind) * factor + } + if (sortKey === "csa") { + return (left.csa - right.csa) * factor + } + if (sortKey === "id") { + return (left.idMax - right.idMax) * factor + } + return (left.bps - right.bps) * factor + }) + return sorted + }, [files, filterText, sortKey, direction]) + + const setSort = (key: SortKey) => { + if (sortKey === key) { + setDirection(direction === "asc" ? "desc" : "asc") + return + } + setSortKey(key) + setDirection("asc") + } + + return ( + + + + + + + + + + + + + {rows.map((row) => ( + + + + + + + + + ))} + +
+ + + + + + + + + + + +
{row.file}{row.unit}{row.kind}{row.csa}{row.idMax}{row.bps.toFixed(2)}
+ ) +} diff --git a/viewer/src/report_types.ts b/viewer/src/report_types.ts new file mode 100644 index 0000000..0ff3de6 --- /dev/null +++ b/viewer/src/report_types.ts @@ -0,0 +1,39 @@ +export type ReportSummary = { + total_files: number + total_units: number +} + +export type UnitDefinition = { + file_path: string + qualified_id: string + kind: string + start_line: number + end_line: number +} + +export type UnitMetrics = { + csa_main: number + external_symbols: string[] + self_fields_read: string[] + id_max: number + id_avg: number + unresolved_calls: string[] + bps: number +} + +export type UnitReport = { + unit: UnitDefinition + metrics: UnitMetrics +} + +export type FileReport = { + file_path: string + units: UnitReport[] +} + +export type Report = { + schema_version: string + language: string + summary: ReportSummary + files: FileReport[] +} diff --git a/viewer/src/styles.css b/viewer/src/styles.css new file mode 100644 index 0000000..7af25cd --- /dev/null +++ b/viewer/src/styles.css @@ -0,0 +1,122 @@ +:root { + font-family: "Inter", system-ui, sans-serif; + color: #0f172a; + background-color: #f8fafc; +} + +body { + margin: 0; + min-height: 100vh; +} + +.app { + max-width: 1200px; + margin: 0 auto; + padding: 32px 24px 64px; + display: flex; + flex-direction: column; + gap: 24px; +} + +header h1 { + margin: 0 0 8px; +} + +header p { + margin: 0; + color: #475569; +} + +.controls { + display: flex; + flex-wrap: wrap; + gap: 16px; + align-items: flex-end; +} + +.controls label { + display: flex; + flex-direction: column; + gap: 6px; + font-size: 14px; +} + +.controls input[type="text"], +.controls input[type="file"] { + padding: 8px 10px; + border-radius: 6px; + border: 1px solid #cbd5f5; + background: #ffffff; +} + +.summary { + display: grid; + grid-template-columns: repeat(auto-fit, minmax(160px, 1fr)); + gap: 12px; + background: #ffffff; + padding: 16px; + border-radius: 12px; + box-shadow: 0 8px 24px rgba(15, 23, 42, 0.08); +} + +.summary div { + display: flex; + flex-direction: column; + gap: 4px; +} + +.summary strong { + font-size: 12px; + text-transform: uppercase; + letter-spacing: 0.04em; + color: #64748b; +} + +.report-table { + width: 100%; + border-collapse: collapse; + background: #ffffff; + border-radius: 12px; + overflow: hidden; + box-shadow: 0 8px 24px rgba(15, 23, 42, 0.08); +} + +.report-table th, +.report-table td { + padding: 12px 14px; + text-align: left; + border-bottom: 1px solid #e2e8f0; + font-size: 14px; +} + +.report-table th { + background: #f1f5f9; +} + +.report-table th button { + background: none; + border: none; + padding: 0; + font: inherit; + color: #0f172a; + cursor: pointer; +} + +.report-table tr:last-child td { + border-bottom: none; +} + +.error { + padding: 12px 16px; + border-radius: 8px; + background: #fee2e2; + color: #991b1b; +} + +.empty { + padding: 24px; + border-radius: 12px; + background: #ffffff; + color: #64748b; + border: 1px dashed #cbd5f5; +} diff --git a/viewer/tsconfig.json b/viewer/tsconfig.json new file mode 100644 index 0000000..e84f2af --- /dev/null +++ b/viewer/tsconfig.json @@ -0,0 +1,17 @@ +{ + "compilerOptions": { + "target": "ES2020", + "useDefineForClassFields": true, + "lib": ["ES2020", "DOM", "DOM.Iterable"], + "module": "ESNext", + "skipLibCheck": true, + "moduleResolution": "Bundler", + "allowImportingTsExtensions": true, + "resolveJsonModule": true, + "isolatedModules": true, + "noEmit": true, + "jsx": "react-jsx", + "strict": true + }, + "include": ["src"] +} diff --git a/viewer/vite.config.ts b/viewer/vite.config.ts new file mode 100644 index 0000000..9c9b6e4 --- /dev/null +++ b/viewer/vite.config.ts @@ -0,0 +1,6 @@ +import { defineConfig } from "vite" +import react from "@vitejs/plugin-react" + +export default defineConfig({ + plugins: [react()] +})