diff --git a/ontokit/api/routes/classes.py b/ontokit/api/routes/classes.py index 217e9ac1..e81dc499 100644 --- a/ontokit/api/routes/classes.py +++ b/ontokit/api/routes/classes.py @@ -3,8 +3,9 @@ from typing import Annotated from uuid import UUID -from fastapi import APIRouter, Depends, HTTPException, status +from fastapi import APIRouter, Depends, HTTPException, Query, status +from ontokit.schemas.graph import EntityGraphResponse from ontokit.schemas.owl_class import ( OWLClassCreate, OWLClassListResponse, @@ -57,6 +58,39 @@ async def create_class( return await service.create_class(ontology_id, owl_class) +@router.get( + "/ontologies/{ontology_id}/classes/graph", + response_model=EntityGraphResponse, +) +async def get_class_graph( + ontology_id: UUID, + service: Annotated[OntologyService, Depends(get_ontology_service)], + class_iri: str = Query(description="IRI of the class to build the graph around"), + branch: str = "main", + ancestors_depth: int = Query(default=5, ge=0, le=10), + descendants_depth: int = Query(default=2, ge=0, le=10), + max_nodes: int = Query(default=200, ge=1, le=500), + include_see_also: bool = True, +) -> EntityGraphResponse: + """Build a multi-hop entity graph around a class via BFS. + + Returns nodes and edges for visualization, with lineage-based node types + for ontology-agnostic coloring (root, ancestor, focus, descendant, etc.). + """ + result = await service.build_entity_graph( + ontology_id, + class_iri, + branch=branch, + ancestors_depth=ancestors_depth, + descendants_depth=descendants_depth, + max_nodes=max_nodes, + include_see_also=include_see_also, + ) + if result is None: + raise HTTPException(status_code=404, detail="Class not found") + return result + + @router.get("/ontologies/{ontology_id}/classes/{class_iri:path}", response_model=OWLClassResponse) async def get_class( ontology_id: UUID, @@ -97,26 +131,3 @@ async def delete_class( deleted = await service.delete_class(ontology_id, class_iri) if not deleted: raise HTTPException(status_code=404, detail="Class not found") - - -@router.get("/ontologies/{ontology_id}/classes/{class_iri:path}/hierarchy") -async def get_class_hierarchy( - ontology_id: UUID, - class_iri: str, - service: Annotated[OntologyService, Depends(get_ontology_service)], - direction: str = "both", - depth: int = 3, -) -> dict[str, object]: - """ - Get the class hierarchy around a specific class. - - Args: - direction: 'ancestors', 'descendants', or 'both' - depth: Maximum depth to traverse - """ - return await service.get_class_hierarchy( - ontology_id, - class_iri, - direction=direction, - depth=depth, - ) diff --git a/ontokit/api/routes/projects.py b/ontokit/api/routes/projects.py index 325b7bfd..b2bad732 100644 --- a/ontokit/api/routes/projects.py +++ b/ontokit/api/routes/projects.py @@ -27,6 +27,7 @@ from ontokit.models.branch_metadata import BranchMetadata from ontokit.models.pull_request import GitHubIntegration, PRStatus, PullRequest from ontokit.models.user_github_token import UserGitHubToken +from ontokit.schemas.graph import EntityGraphResponse from ontokit.schemas.owl_class import EntitySearchResponse, OWLClassResponse, OWLClassTreeResponse from ontokit.schemas.project import ( BranchCreate, @@ -629,6 +630,47 @@ async def get_ontology_tree_children( return OWLClassTreeResponse(nodes=nodes, total_classes=total_classes) +@router.get( + "/{project_id}/ontology/classes/graph", + response_model=EntityGraphResponse, +) +async def get_ontology_class_graph( + project_id: UUID, + service: Annotated[ProjectService, Depends(get_service)], + ontology: Annotated[OntologyService, Depends(get_ontology)], + git: Annotated[GitRepositoryService, Depends(get_git)], + user: OptionalUser, + class_iri: str = Query(description="IRI of the class to build the graph around"), + branch: str | None = Query(default=None, description="Branch to read from"), + ancestors_depth: int = Query(default=5, ge=0, le=10), + descendants_depth: int = Query(default=2, ge=0, le=10), + max_nodes: int = Query(default=200, ge=1, le=500), + include_see_also: bool = Query(default=True), +) -> EntityGraphResponse: + """Build a multi-hop entity graph around a class via BFS. + + Returns nodes and edges for visualization, with lineage-based node types. + """ + resolved_branch = branch or git.get_default_branch(project_id) + await _ensure_ontology_loaded(project_id, service, ontology, user, resolved_branch, git) + + result = await ontology.build_entity_graph( + project_id, + class_iri, + branch=resolved_branch, + ancestors_depth=ancestors_depth, + descendants_depth=descendants_depth, + max_nodes=max_nodes, + include_see_also=include_see_also, + ) + if result is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Class not found: {class_iri}", + ) + return result + + @router.get("/{project_id}/ontology/classes/{class_iri:path}", response_model=OWLClassResponse) async def get_ontology_class( project_id: UUID, diff --git a/ontokit/schemas/graph.py b/ontokit/schemas/graph.py new file mode 100644 index 00000000..2464530e --- /dev/null +++ b/ontokit/schemas/graph.py @@ -0,0 +1,40 @@ +"""Pydantic models for the Entity Graph API.""" + +from __future__ import annotations + +from pydantic import BaseModel + + +class GraphNode(BaseModel): + """A node in the entity graph.""" + + id: str + label: str + iri: str + definition: str | None = None + is_focus: bool = False + is_root: bool = False + depth: int = 0 + node_type: str = "class" + child_count: int | None = None + + +class GraphEdge(BaseModel): + """An edge in the entity graph.""" + + id: str + source: str + target: str + edge_type: str + label: str | None = None + + +class EntityGraphResponse(BaseModel): + """Complete graph response.""" + + focus_iri: str + focus_label: str + nodes: list[GraphNode] + edges: list[GraphEdge] + truncated: bool = False + total_concept_count: int = 0 diff --git a/ontokit/services/ontology.py b/ontokit/services/ontology.py index e31c731d..7a59f310 100644 --- a/ontokit/services/ontology.py +++ b/ontokit/services/ontology.py @@ -1,7 +1,10 @@ """Ontology service for managing OWL ontologies.""" +from __future__ import annotations + +from collections import deque from dataclasses import dataclass -from typing import Any, cast +from typing import TYPE_CHECKING, Any, cast from typing import Literal as TypingLiteral from uuid import UUID @@ -33,6 +36,9 @@ ) from ontokit.services.storage import StorageService +if TYPE_CHECKING: + from ontokit.schemas.graph import EntityGraphResponse + # Map file extensions to RDF formats FORMAT_MAP = { ".owl": "xml", @@ -58,6 +64,16 @@ # Default label preferences if none specified DEFAULT_LABEL_PREFERENCES = ["rdfs:label@en", "rdfs:label", "skos:prefLabel@en", "skos:prefLabel"] +# Namespaces treated as external (not part of the ontology being edited) +EXTERNAL_NAMESPACES = ( + "http://www.w3.org/2000/01/rdf-schema#", + "http://www.w3.org/2002/07/owl#", + "http://xmlns.com/foaf/0.1/", + "http://purl.org/dc/elements/1.1/", + "http://purl.org/dc/terms/", + "http://www.w3.org/2004/02/skos/core#", +) + # Common annotation properties to extract for class details # (excludes rdfs:label and rdfs:comment which are handled separately) ANNOTATION_PROPERTIES = { @@ -120,7 +136,7 @@ class LabelPreference: language: str | None # None means any language or no language tag @classmethod - def parse(cls, pref_string: str) -> "LabelPreference | None": + def parse(cls, pref_string: str) -> LabelPreference | None: """ Parse a preference string like 'rdfs:label@en' or 'skos:prefLabel'. @@ -340,16 +356,340 @@ async def delete_class(self, ontology_id: UUID, class_iri: str) -> bool: # TODO: Implement class deletion raise NotImplementedError("Class deletion pending") - async def get_class_hierarchy( + async def build_entity_graph( self, ontology_id: UUID, class_iri: str, - direction: str = "both", - depth: int = 3, - ) -> dict[str, Any]: - """Get class hierarchy around a specific class.""" - # TODO: Implement hierarchy traversal - raise NotImplementedError("Hierarchy implementation pending") + branch: str = "main", + ancestors_depth: int = 5, + descendants_depth: int = 2, + max_nodes: int = 200, + include_see_also: bool = True, + max_see_also_per_node: int = 5, + ) -> EntityGraphResponse | None: + """Build a multi-hop graph around a class via BFS. + + Traverses ancestors (subClassOf upward), descendants (subClassOf downward), + and optional seeAlso cross-links. Returns nodes with lineage-based types + for ontology-agnostic coloring. + """ + if max_nodes < 1: + raise ValueError("max_nodes must be at least 1") + if ancestors_depth < 0: + raise ValueError("ancestors_depth must be non-negative") + if descendants_depth < 0: + raise ValueError("descendants_depth must be non-negative") + if max_see_also_per_node < 0: + raise ValueError("max_see_also_per_node must be non-negative") + if not isinstance(include_see_also, bool): + raise ValueError("include_see_also must be a boolean") + + from ontokit.schemas.graph import EntityGraphResponse, GraphEdge, GraphNode + + graph = await self._get_graph(ontology_id, branch) + class_uri = URIRef(class_iri) + + if (class_uri, RDF.type, OWL.Class) not in graph: + return None + + owl_thing = OWL.Thing + + visited: dict[str, GraphNode] = {} + edges: list[GraphEdge] = [] + edge_ids: set[str] = set() + total_discovered = [0] + + def _get_local_name(iri: str) -> str: + if "#" in iri: + return iri.split("#")[-1] + return iri.rsplit("/", 1)[-1] + + def _get_label(uri: URIRef) -> str: + label = select_preferred_label(graph, uri) + return label if label else _get_local_name(str(uri)) + + def _is_external(iri: str) -> bool: + return any(iri.startswith(ns) for ns in EXTERNAL_NAMESPACES) + + def _is_root_class(uri: URIRef) -> bool: + parents = [ + p + for p in graph.objects(uri, RDFS.subClassOf) + if isinstance(p, URIRef) and p != owl_thing + ] + return len(parents) == 0 + + def _classify_node(uri: URIRef, is_focus: bool, _depth: int) -> str: + iri = str(uri) + if is_focus: + return "focus" + if _is_external(iri): + return "external" + # Check if individual (instance, not a class) + if (uri, RDF.type, OWL.Class) not in graph: + for rdf_type in graph.objects(uri, RDF.type): + if rdf_type in ( + OWL.ObjectProperty, + OWL.DatatypeProperty, + OWL.AnnotationProperty, + ): + return "property" + return "individual" + if _is_root_class(uri): + return "root" + return "class" + + def _get_definition(uri: URIRef) -> str | None: + # Try SKOS definition first, then rdfs:comment + for obj in graph.objects(uri, SKOS.definition): + if isinstance(obj, RDFLiteral): + return str(obj) + for obj in graph.objects(uri, RDFS.comment): + if isinstance(obj, RDFLiteral): + return str(obj) + return None + + def _child_count(uri: URIRef) -> int: + return sum( + 1 + for s in graph.subjects(RDFS.subClassOf, uri) + if isinstance(s, URIRef) and (s, RDF.type, OWL.Class) in graph + ) + + seen: set[str] = set() + + def _make_node(uri: URIRef, depth: int) -> GraphNode | None: + iri = str(uri) + if iri in visited: + return visited[iri] + if iri not in seen: + seen.add(iri) + total_discovered[0] += 1 + if len(visited) >= max_nodes: + return None + is_focus = uri == class_uri + node_type = _classify_node(uri, is_focus, depth) + is_root = _is_root_class(uri) if node_type in ("class", "root") else False + node = GraphNode( + id=iri, + label=_get_label(uri), + iri=iri, + definition=_get_definition(uri), + is_focus=is_focus, + is_root=is_root, + depth=depth, + node_type=node_type, + child_count=_child_count(uri), + ) + visited[iri] = node + return node + + def _add_edge(source: str, target: str, edge_type: str, label: str | None = None) -> bool: + eid = f"{source}->{target}:{edge_type}" + if eid in edge_ids: + return False + edge_ids.add(eid) + edges.append( + GraphEdge(id=eid, source=source, target=target, edge_type=edge_type, label=label) + ) + return True + + # Create focus node — always succeeds: class existence is verified above + # and visited dict is empty so max_nodes cannot be exceeded. + _make_node(class_uri, 0) + + # BFS upward (ancestors) + ancestor_queue: deque[tuple[URIRef, int]] = deque([(class_uri, 0)]) + ancestor_visited: set[str] = {class_iri} + while ancestor_queue: + current_uri, current_depth = ancestor_queue.popleft() + if current_depth >= ancestors_depth: + continue + for parent in graph.objects(current_uri, RDFS.subClassOf): + if not isinstance(parent, URIRef) or parent == owl_thing: + continue + parent_iri = str(parent) + parent_node = _make_node(parent, -(current_depth + 1)) + if parent_node is None: + continue + _add_edge(parent_iri, str(current_uri), "subClassOf") + if parent_iri not in ancestor_visited: + ancestor_visited.add(parent_iri) + ancestor_queue.append((parent, current_depth + 1)) + + # BFS downward (descendants) + descendant_queue: deque[tuple[URIRef, int]] = deque([(class_uri, 0)]) + descendant_visited: set[str] = {class_iri} + while descendant_queue: + current_uri, current_depth = descendant_queue.popleft() + if current_depth >= descendants_depth: + continue + for child in graph.subjects(RDFS.subClassOf, current_uri): + if not isinstance(child, URIRef): + continue + child_iri = str(child) + child_node = _make_node(child, current_depth + 1) + if child_node is None: + continue + _add_edge(str(current_uri), child_iri, "subClassOf") + if child_iri not in descendant_visited: + descendant_visited.add(child_iri) + descendant_queue.append((child, current_depth + 1)) + + # Collect equivalentClass and disjointWith for visited nodes + for node_iri in list(visited.keys()): + node_uri = URIRef(node_iri) + for equiv in graph.objects(node_uri, OWL.equivalentClass): + if isinstance(equiv, URIRef) and str(equiv) in visited: + if node_iri < str(equiv): + _add_edge(node_iri, str(equiv), "equivalentClass", "equivalentTo") + else: + _add_edge(str(equiv), node_iri, "equivalentClass", "equivalentTo") + for disj in graph.objects(node_uri, OWL.disjointWith): + if isinstance(disj, URIRef) and str(disj) in visited: + if node_iri < str(disj): + _add_edge(node_iri, str(disj), "disjointWith", "disjointWith") + else: + _add_edge(str(disj), node_iri, "disjointWith", "disjointWith") + + # Extract seeAlso targets from OWL restrictions on rdfs:seeAlso + def _get_see_also_targets(uri: URIRef) -> list[URIRef]: + """Extract seeAlso targets from both direct triples and OWL restrictions. + + FOLIO encodes seeAlso as owl:Restriction with owl:someValuesFrom + inside rdfs:subClassOf, not as direct rdfs:seeAlso triples. + """ + seen: set[URIRef] = set() + targets: list[URIRef] = [] + + def _add(ref: URIRef) -> None: + if ref not in seen: + seen.add(ref) + targets.append(ref) + + # Direct rdfs:seeAlso triples + for obj in graph.objects(uri, RDFS.seeAlso): + if isinstance(obj, URIRef): + _add(obj) + # OWL restrictions: subClassOf -> Restriction(onProperty=seeAlso, someValuesFrom=X) + for sc in graph.objects(uri, RDFS.subClassOf): + if isinstance(sc, URIRef): + continue # Named superclass, not a restriction + # sc is a blank node (restriction) + on_prop = next(graph.objects(sc, OWL.onProperty), None) + if on_prop == RDFS.seeAlso: + for val in graph.objects(sc, OWL.someValuesFrom): + if isinstance(val, URIRef): + _add(val) + for val in graph.objects(sc, OWL.allValuesFrom): + if isinstance(val, URIRef): + _add(val) + for val in graph.objects(sc, OWL.hasValue): + if isinstance(val, URIRef): + _add(val) + return targets + + def _get_see_also_referrers(uri: URIRef) -> list[URIRef]: + """Find classes that have seeAlso restrictions pointing TO this URI.""" + seen: set[URIRef] = set() + referrers: list[URIRef] = [] + + def _add(ref: URIRef) -> None: + if ref not in seen: + seen.add(ref) + referrers.append(ref) + + # Direct reverse rdfs:seeAlso + for subj in graph.subjects(RDFS.seeAlso, uri): + if isinstance(subj, URIRef): + _add(subj) + # Find restrictions that reference uri via someValuesFrom/allValuesFrom/hasValue + for predicate in (OWL.someValuesFrom, OWL.allValuesFrom, OWL.hasValue): + for restriction in graph.subjects(predicate, uri): + on_prop = next(graph.objects(restriction, OWL.onProperty), None) + if on_prop == RDFS.seeAlso: + for cls in graph.subjects(RDFS.subClassOf, restriction): + if isinstance(cls, URIRef) and (cls, RDF.type, OWL.Class) in graph: + _add(cls) + return referrers + + # Collect seeAlso cross-links + # Outgoing seeAlso: checked on all visited nodes (focus + ancestors) + # Incoming seeAlso: only checked on the focus node (intermediates are too noisy) + see_also_nodes: list[URIRef] = [] + if include_see_also: + for node_iri in list(visited.keys()): + node_uri = URIRef(node_iri) + sa_count = 0 + + # Outgoing: this node seeAlso -> related + for related in _get_see_also_targets(node_uri): + if sa_count >= max_see_also_per_node: + break + related_iri = str(related) + was_new = related_iri not in visited + if was_new: + related_node = _make_node(related, 0) + if related_node is None: + continue + # Always enqueue for ancestor traversal so seeAlso targets + # that were already visited (e.g. as descendants) still get + # their own ancestor branch explored. + see_also_nodes.append(related) + if _add_edge(node_iri, related_iri, "seeAlso", "rdfs:seeAlso"): + sa_count += 1 + + # Incoming: only on the focus node to avoid cascade + if node_uri == class_uri: + for referrer in _get_see_also_referrers(node_uri): + if sa_count >= max_see_also_per_node: + break + referrer_iri = str(referrer) + was_new = referrer_iri not in visited + if was_new: + referrer_node = _make_node(referrer, 0) + if referrer_node is None: + continue + see_also_nodes.append(referrer) + if _add_edge(referrer_iri, node_iri, "seeAlso", "rdfs:seeAlso"): + sa_count += 1 + + # BFS upward from seeAlso nodes to their roots + if see_also_nodes: + sa_queue: deque[tuple[URIRef, int]] = deque((u, 0) for u in see_also_nodes) + sa_visited: set[str] = {str(u) for u in see_also_nodes} | ancestor_visited + while sa_queue: + current_uri, current_depth = sa_queue.popleft() + if current_depth >= ancestors_depth: + continue + for parent in graph.objects(current_uri, RDFS.subClassOf): + if not isinstance(parent, URIRef) or parent == owl_thing: + continue + parent_iri = str(parent) + parent_node = _make_node(parent, -(current_depth + 1)) + if parent_node is None: + continue + _add_edge(parent_iri, str(current_uri), "subClassOf") + if parent_iri not in sa_visited: + sa_visited.add(parent_iri) + sa_queue.append((parent, current_depth + 1)) + + # Reclassify roots: primary roots (from subClassOf BFS) stay "root", + # roots discovered via seeAlso branches become "secondary_root" + for node in visited.values(): + if node.node_type == "root" and node.iri not in ancestor_visited: + node.node_type = "secondary_root" + + truncated = total_discovered[0] > len(visited) + + return EntityGraphResponse( + focus_iri=class_iri, + focus_label=_get_label(class_uri), + nodes=list(visited.values()), + edges=edges, + truncated=truncated, + total_concept_count=total_discovered[0], + ) async def get_root_classes( self, diff --git a/tests/unit/test_entity_graph.py b/tests/unit/test_entity_graph.py new file mode 100644 index 00000000..249ee1ec --- /dev/null +++ b/tests/unit/test_entity_graph.py @@ -0,0 +1,757 @@ +"""Tests for the build_entity_graph method on OntologyService.""" + +from __future__ import annotations + +import uuid + +import pytest +from rdflib import BNode, Graph, Literal, Namespace, URIRef +from rdflib.namespace import OWL, RDF, RDFS, SKOS + +from ontokit.services.ontology import OntologyService + +EX = Namespace("http://example.org/ontology#") +PROJECT_ID = uuid.UUID("12345678-1234-5678-1234-567812345678") +BRANCH = "main" + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _service_with_graph(g: Graph) -> OntologyService: + svc = OntologyService(storage=None) + svc.set_graph(PROJECT_ID, BRANCH, g) + return svc + + +def _base_graph() -> Graph: + """Graph with a simple 3-level hierarchy: Animal > Person > Student.""" + g = Graph() + g.add((EX.Animal, RDF.type, OWL.Class)) + g.add((EX.Animal, RDFS.label, Literal("Animal", lang="en"))) + + g.add((EX.Person, RDF.type, OWL.Class)) + g.add((EX.Person, RDFS.label, Literal("Person", lang="en"))) + g.add((EX.Person, RDFS.subClassOf, EX.Animal)) + g.add((EX.Person, RDFS.comment, Literal("A human being", lang="en"))) + + g.add((EX.Student, RDF.type, OWL.Class)) + g.add((EX.Student, RDFS.label, Literal("Student", lang="en"))) + g.add((EX.Student, RDFS.subClassOf, EX.Person)) + + g.add((EX.GradStudent, RDF.type, OWL.Class)) + g.add((EX.GradStudent, RDFS.label, Literal("Graduate Student", lang="en"))) + g.add((EX.GradStudent, RDFS.subClassOf, EX.Student)) + return g + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestBuildEntityGraphBasic: + @pytest.mark.asyncio + async def test_returns_none_for_missing_class(self) -> None: + g = Graph() + svc = _service_with_graph(g) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Missing), BRANCH) + assert result is None + + @pytest.mark.asyncio + async def test_focus_node(self) -> None: + svc = _service_with_graph(_base_graph()) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH) + assert result is not None + assert result.focus_iri == str(EX.Person) + assert result.focus_label == "Person" + + focus_nodes = [n for n in result.nodes if n.is_focus] + assert len(focus_nodes) == 1 + assert focus_nodes[0].node_type == "focus" + + @pytest.mark.asyncio + async def test_ancestors_discovered(self) -> None: + svc = _service_with_graph(_base_graph()) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH) + assert result is not None + iris = {n.iri for n in result.nodes} + assert str(EX.Animal) in iris + + @pytest.mark.asyncio + async def test_descendants_discovered(self) -> None: + svc = _service_with_graph(_base_graph()) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH) + assert result is not None + iris = {n.iri for n in result.nodes} + assert str(EX.Student) in iris + + @pytest.mark.asyncio + async def test_edges_created(self) -> None: + svc = _service_with_graph(_base_graph()) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH) + assert result is not None + sub_edges = [e for e in result.edges if e.edge_type == "subClassOf"] + assert len(sub_edges) >= 2 # Animal->Person, Person->Student + + @pytest.mark.asyncio + async def test_root_class_detected(self) -> None: + svc = _service_with_graph(_base_graph()) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH) + assert result is not None + animal_node = next(n for n in result.nodes if n.iri == str(EX.Animal)) + assert animal_node.is_root is True + assert animal_node.node_type == "root" + + @pytest.mark.asyncio + async def test_definition_from_comment(self) -> None: + svc = _service_with_graph(_base_graph()) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH) + assert result is not None + person = next(n for n in result.nodes if n.iri == str(EX.Person)) + assert person.definition == "A human being" + + @pytest.mark.asyncio + async def test_definition_from_skos(self) -> None: + g = _base_graph() + g.add((EX.Person, SKOS.definition, Literal("SKOS definition"))) + svc = _service_with_graph(g) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH) + assert result is not None + person = next(n for n in result.nodes if n.iri == str(EX.Person)) + # SKOS definition takes priority + assert person.definition == "SKOS definition" + + @pytest.mark.asyncio + async def test_child_count(self) -> None: + svc = _service_with_graph(_base_graph()) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH) + assert result is not None + person = next(n for n in result.nodes if n.iri == str(EX.Person)) + assert person.child_count == 1 # Student + + +class TestBuildEntityGraphDepthLimits: + @pytest.mark.asyncio + async def test_ancestors_depth_limit(self) -> None: + """With ancestors_depth=0, no ancestors are traversed.""" + svc = _service_with_graph(_base_graph()) + result = await svc.build_entity_graph( + PROJECT_ID, str(EX.Student), BRANCH, ancestors_depth=0 + ) + assert result is not None + iris = {n.iri for n in result.nodes} + assert str(EX.Animal) not in iris + assert str(EX.Person) not in iris + + @pytest.mark.asyncio + async def test_descendants_depth_limit(self) -> None: + """With descendants_depth=0, no descendants are traversed.""" + svc = _service_with_graph(_base_graph()) + result = await svc.build_entity_graph( + PROJECT_ID, str(EX.Person), BRANCH, descendants_depth=0 + ) + assert result is not None + iris = {n.iri for n in result.nodes} + assert str(EX.Student) not in iris + + @pytest.mark.asyncio + async def test_descendants_depth_1(self) -> None: + """With descendants_depth=1, only direct children are found.""" + svc = _service_with_graph(_base_graph()) + result = await svc.build_entity_graph( + PROJECT_ID, str(EX.Person), BRANCH, descendants_depth=1 + ) + assert result is not None + iris = {n.iri for n in result.nodes} + assert str(EX.Student) in iris + assert str(EX.GradStudent) not in iris + + +class TestBuildEntityGraphMaxNodes: + @pytest.mark.asyncio + async def test_truncation(self) -> None: + svc = _service_with_graph(_base_graph()) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH, max_nodes=2) + assert result is not None + assert len(result.nodes) <= 2 + assert result.truncated is True + assert result.total_concept_count > len(result.nodes) + + @pytest.mark.asyncio + async def test_no_truncation(self) -> None: + svc = _service_with_graph(_base_graph()) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH, max_nodes=200) + assert result is not None + assert result.truncated is False + + +class TestBuildEntityGraphOwlRelations: + @pytest.mark.asyncio + async def test_equivalent_class(self) -> None: + """equivalentClass edges appear between two visited nodes.""" + g = _base_graph() + # Person and Student are both visited (ancestor/descendant). + g.add((EX.Person, OWL.equivalentClass, EX.Student)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH) + assert result is not None + equiv_edges = [e for e in result.edges if e.edge_type == "equivalentClass"] + assert len(equiv_edges) == 1 + assert equiv_edges[0].label == "equivalentTo" + + @pytest.mark.asyncio + async def test_disjoint_with(self) -> None: + """disjointWith edges appear between two visited nodes.""" + g = _base_graph() + # Person and Animal are both visited (focus + ancestor). + g.add((EX.Person, OWL.disjointWith, EX.Animal)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH) + assert result is not None + disj_edges = [e for e in result.edges if e.edge_type == "disjointWith"] + assert len(disj_edges) == 1 + assert disj_edges[0].label == "disjointWith" + + +class TestBuildEntityGraphSeeAlso: + @pytest.mark.asyncio + async def test_direct_see_also(self) -> None: + g = _base_graph() + g.add((EX.Related, RDF.type, OWL.Class)) + g.add((EX.Related, RDFS.label, Literal("Related"))) + g.add((EX.Person, RDFS.seeAlso, EX.Related)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH) + assert result is not None + iris = {n.iri for n in result.nodes} + assert str(EX.Related) in iris + sa_edges = [e for e in result.edges if e.edge_type == "seeAlso"] + assert len(sa_edges) >= 1 + + @pytest.mark.asyncio + async def test_see_also_disabled(self) -> None: + g = _base_graph() + g.add((EX.Related, RDF.type, OWL.Class)) + g.add((EX.Person, RDFS.seeAlso, EX.Related)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph( + PROJECT_ID, str(EX.Person), BRANCH, include_see_also=False + ) + assert result is not None + iris = {n.iri for n in result.nodes} + assert str(EX.Related) not in iris + + @pytest.mark.asyncio + async def test_owl_restriction_see_also(self) -> None: + """seeAlso encoded as OWL restriction (someValuesFrom) on subClassOf.""" + g = _base_graph() + g.add((EX.Related, RDF.type, OWL.Class)) + restriction = BNode() + g.add((restriction, RDF.type, OWL.Restriction)) + g.add((restriction, OWL.onProperty, RDFS.seeAlso)) + g.add((restriction, OWL.someValuesFrom, EX.Related)) + g.add((EX.Person, RDFS.subClassOf, restriction)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH) + assert result is not None + iris = {n.iri for n in result.nodes} + assert str(EX.Related) in iris + + @pytest.mark.asyncio + async def test_owl_restriction_all_values_from(self) -> None: + """seeAlso encoded as OWL restriction (allValuesFrom).""" + g = _base_graph() + g.add((EX.Related, RDF.type, OWL.Class)) + restriction = BNode() + g.add((restriction, RDF.type, OWL.Restriction)) + g.add((restriction, OWL.onProperty, RDFS.seeAlso)) + g.add((restriction, OWL.allValuesFrom, EX.Related)) + g.add((EX.Person, RDFS.subClassOf, restriction)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH) + assert result is not None + iris = {n.iri for n in result.nodes} + assert str(EX.Related) in iris + + @pytest.mark.asyncio + async def test_owl_restriction_has_value(self) -> None: + """seeAlso encoded as OWL restriction (hasValue).""" + g = _base_graph() + g.add((EX.Related, RDF.type, OWL.Class)) + restriction = BNode() + g.add((restriction, RDF.type, OWL.Restriction)) + g.add((restriction, OWL.onProperty, RDFS.seeAlso)) + g.add((restriction, OWL.hasValue, EX.Related)) + g.add((EX.Person, RDFS.subClassOf, restriction)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH) + assert result is not None + iris = {n.iri for n in result.nodes} + assert str(EX.Related) in iris + + @pytest.mark.asyncio + async def test_incoming_see_also_on_focus(self) -> None: + """Reverse seeAlso — another class references the focus via seeAlso.""" + g = _base_graph() + g.add((EX.Referrer, RDF.type, OWL.Class)) + g.add((EX.Referrer, RDFS.seeAlso, EX.Person)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH) + assert result is not None + iris = {n.iri for n in result.nodes} + assert str(EX.Referrer) in iris + + @pytest.mark.asyncio + async def test_incoming_restriction_see_also(self) -> None: + """Reverse seeAlso via OWL restriction (someValuesFrom -> focus).""" + g = _base_graph() + g.add((EX.Referrer, RDF.type, OWL.Class)) + restriction = BNode() + g.add((restriction, RDF.type, OWL.Restriction)) + g.add((restriction, OWL.onProperty, RDFS.seeAlso)) + g.add((restriction, OWL.someValuesFrom, EX.Person)) + g.add((EX.Referrer, RDFS.subClassOf, restriction)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH) + assert result is not None + iris = {n.iri for n in result.nodes} + assert str(EX.Referrer) in iris + + @pytest.mark.asyncio + async def test_see_also_ancestors_traversed(self) -> None: + """After discovering seeAlso nodes, their ancestors are also traversed.""" + g = _base_graph() + # Create a separate branch: Category > Topic, Person seeAlso Topic + g.add((EX.Category, RDF.type, OWL.Class)) + g.add((EX.Topic, RDF.type, OWL.Class)) + g.add((EX.Topic, RDFS.subClassOf, EX.Category)) + g.add((EX.Person, RDFS.seeAlso, EX.Topic)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH) + assert result is not None + iris = {n.iri for n in result.nodes} + assert str(EX.Topic) in iris + assert str(EX.Category) in iris + + @pytest.mark.asyncio + async def test_see_also_secondary_root(self) -> None: + """Roots discovered via seeAlso branches get 'secondary_root' type.""" + g = _base_graph() + g.add((EX.Category, RDF.type, OWL.Class)) + g.add((EX.Topic, RDF.type, OWL.Class)) + g.add((EX.Topic, RDFS.subClassOf, EX.Category)) + g.add((EX.Person, RDFS.seeAlso, EX.Topic)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH) + assert result is not None + category = next(n for n in result.nodes if n.iri == str(EX.Category)) + assert category.node_type == "secondary_root" + + @pytest.mark.asyncio + async def test_see_also_max_per_node(self) -> None: + """max_see_also_per_node limits seeAlso targets collected per node.""" + g = _base_graph() + for i in range(10): + uri = URIRef(f"http://example.org/ontology#Related{i}") + g.add((uri, RDF.type, OWL.Class)) + g.add((EX.Person, RDFS.seeAlso, uri)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph( + PROJECT_ID, str(EX.Person), BRANCH, max_see_also_per_node=3 + ) + assert result is not None + sa_edges = [e for e in result.edges if e.edge_type == "seeAlso"] + assert len(sa_edges) == 3 + + +class TestBuildEntityGraphClassification: + @pytest.mark.asyncio + async def test_external_namespace_classified(self) -> None: + g = _base_graph() + ext = URIRef("http://www.w3.org/2004/02/skos/core#Concept") + g.add((EX.Person, RDFS.seeAlso, ext)) + g.add((ext, RDF.type, OWL.Class)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH) + assert result is not None + ext_node = next((n for n in result.nodes if n.iri == str(ext)), None) + assert ext_node is not None + assert ext_node.node_type == "external" + + @pytest.mark.asyncio + async def test_property_classified(self) -> None: + g = _base_graph() + g.add((EX.myProp, RDF.type, OWL.ObjectProperty)) + g.add((EX.Person, RDFS.seeAlso, EX.myProp)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH) + assert result is not None + prop_node = next((n for n in result.nodes if n.iri == str(EX.myProp)), None) + assert prop_node is not None + assert prop_node.node_type == "property" + + @pytest.mark.asyncio + async def test_individual_classified(self) -> None: + g = _base_graph() + g.add((EX.john, RDF.type, EX.Person)) + g.add((EX.Person, RDFS.seeAlso, EX.john)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH) + assert result is not None + ind_node = next((n for n in result.nodes if n.iri == str(EX.john)), None) + assert ind_node is not None + assert ind_node.node_type == "individual" + + @pytest.mark.asyncio + async def test_local_name_fallback_fragment(self) -> None: + """When no label exists, local name is extracted from fragment.""" + g = Graph() + ns = Namespace("http://example.org/ont#") + g.add((ns.MyClass, RDF.type, OWL.Class)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph(PROJECT_ID, str(ns.MyClass), BRANCH) + assert result is not None + assert result.focus_label == "MyClass" + + @pytest.mark.asyncio + async def test_local_name_fallback_slash(self) -> None: + """When no label exists, local name is extracted from last path segment.""" + g = Graph() + uri = URIRef("http://example.org/ontology/SlashClass") + g.add((uri, RDF.type, OWL.Class)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph(PROJECT_ID, str(uri), BRANCH) + assert result is not None + assert result.focus_label == "SlashClass" + + +class TestBuildEntityGraphValidation: + @pytest.mark.asyncio + async def test_max_nodes_zero_raises(self) -> None: + svc = _service_with_graph(_base_graph()) + with pytest.raises(ValueError, match="max_nodes must be at least 1"): + await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH, max_nodes=0) + + @pytest.mark.asyncio + async def test_negative_ancestors_depth_raises(self) -> None: + svc = _service_with_graph(_base_graph()) + with pytest.raises(ValueError, match="ancestors_depth must be non-negative"): + await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH, ancestors_depth=-1) + + @pytest.mark.asyncio + async def test_negative_descendants_depth_raises(self) -> None: + svc = _service_with_graph(_base_graph()) + with pytest.raises(ValueError, match="descendants_depth must be non-negative"): + await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH, descendants_depth=-1) + + @pytest.mark.asyncio + async def test_negative_max_see_also_per_node_raises(self) -> None: + svc = _service_with_graph(_base_graph()) + with pytest.raises(ValueError, match="max_see_also_per_node must be non-negative"): + await svc.build_entity_graph( + PROJECT_ID, str(EX.Person), BRANCH, max_see_also_per_node=-1 + ) + + @pytest.mark.asyncio + async def test_non_bool_include_see_also_raises(self) -> None: + svc = _service_with_graph(_base_graph()) + with pytest.raises(ValueError, match="include_see_also must be a boolean"): + await svc.build_entity_graph( + PROJECT_ID, + str(EX.Person), + BRANCH, + include_see_also="yes", # type: ignore[arg-type] + ) + + +class TestBuildEntityGraphIncomingRestrictions: + @pytest.mark.asyncio + async def test_incoming_restriction_all_values_from(self) -> None: + """Reverse seeAlso via OWL restriction (allValuesFrom -> focus).""" + g = _base_graph() + g.add((EX.Referrer, RDF.type, OWL.Class)) + restriction = BNode() + g.add((restriction, RDF.type, OWL.Restriction)) + g.add((restriction, OWL.onProperty, RDFS.seeAlso)) + g.add((restriction, OWL.allValuesFrom, EX.Person)) + g.add((EX.Referrer, RDFS.subClassOf, restriction)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH) + assert result is not None + iris = {n.iri for n in result.nodes} + assert str(EX.Referrer) in iris + + @pytest.mark.asyncio + async def test_incoming_restriction_has_value(self) -> None: + """Reverse seeAlso via OWL restriction (hasValue -> focus).""" + g = _base_graph() + g.add((EX.Referrer, RDF.type, OWL.Class)) + restriction = BNode() + g.add((restriction, RDF.type, OWL.Restriction)) + g.add((restriction, OWL.onProperty, RDFS.seeAlso)) + g.add((restriction, OWL.hasValue, EX.Person)) + g.add((EX.Referrer, RDFS.subClassOf, restriction)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH) + assert result is not None + iris = {n.iri for n in result.nodes} + assert str(EX.Referrer) in iris + + +class TestBuildEntityGraphEdgeCases: + @pytest.mark.asyncio + async def test_owl_thing_parent_skipped(self) -> None: + """owl:Thing parents should not appear as nodes.""" + g = _base_graph() + g.add((EX.Animal, RDFS.subClassOf, OWL.Thing)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH) + assert result is not None + iris = {n.iri for n in result.nodes} + assert str(OWL.Thing) not in iris + + @pytest.mark.asyncio + async def test_duplicate_edges_prevented(self) -> None: + """Same edge should not appear twice.""" + svc = _service_with_graph(_base_graph()) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH) + assert result is not None + edge_ids = [e.id for e in result.edges] + assert len(edge_ids) == len(set(edge_ids)) + + @pytest.mark.asyncio + async def test_already_visited_node_reused(self) -> None: + """If a node is discovered via both ancestor and descendant BFS, it's not duplicated.""" + g = _base_graph() + # Add a diamond: Student also subClassOf Animal (redundant) + g.add((EX.Student, RDFS.subClassOf, EX.Animal)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH) + assert result is not None + animal_nodes = [n for n in result.nodes if n.iri == str(EX.Animal)] + assert len(animal_nodes) == 1 + + @pytest.mark.asyncio + async def test_max_nodes_truncates_ancestors(self) -> None: + """max_nodes limits ancestor BFS — covers _make_node returning None mid-BFS.""" + g = Graph() + # Deep chain: C0 > C1 > C2 > C3 > C4 (focus) + prev = EX.C0 + g.add((prev, RDF.type, OWL.Class)) + for i in range(1, 5): + uri = URIRef(f"http://example.org/ontology#C{i}") + g.add((uri, RDF.type, OWL.Class)) + g.add((uri, RDFS.subClassOf, prev)) + prev = uri + svc = _service_with_graph(g) + result = await svc.build_entity_graph( + PROJECT_ID, str(EX.C4), BRANCH, max_nodes=3, ancestors_depth=10 + ) + assert result is not None + assert len(result.nodes) <= 3 + assert result.truncated is True + + @pytest.mark.asyncio + async def test_max_nodes_truncates_see_also(self) -> None: + """max_nodes reached during seeAlso collection — covers seeAlso _make_node None.""" + g = _base_graph() + for i in range(20): + uri = URIRef(f"http://example.org/ontology#SA{i}") + g.add((uri, RDF.type, OWL.Class)) + g.add((EX.Person, RDFS.seeAlso, uri)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph( + PROJECT_ID, + str(EX.Person), + BRANCH, + max_nodes=5, + max_see_also_per_node=20, + ) + assert result is not None + assert len(result.nodes) <= 5 + assert result.truncated is True + + @pytest.mark.asyncio + async def test_max_nodes_truncates_see_also_ancestors(self) -> None: + """max_nodes reached during seeAlso ancestor BFS.""" + g = Graph() + g.add((EX.Focus, RDF.type, OWL.Class)) + # seeAlso target with a deep ancestor chain + g.add((EX.SATarget, RDF.type, OWL.Class)) + g.add((EX.Focus, RDFS.seeAlso, EX.SATarget)) + g.add((EX.SAParent, RDF.type, OWL.Class)) + g.add((EX.SATarget, RDFS.subClassOf, EX.SAParent)) + g.add((EX.SAGrandparent, RDF.type, OWL.Class)) + g.add((EX.SAParent, RDFS.subClassOf, EX.SAGrandparent)) + svc = _service_with_graph(g) + # max_nodes=3 means Focus + SATarget + SAParent; SAGrandparent is truncated + result = await svc.build_entity_graph( + PROJECT_ID, str(EX.Focus), BRANCH, max_nodes=3, ancestors_depth=10 + ) + assert result is not None + assert len(result.nodes) <= 3 + assert result.truncated is True + + @pytest.mark.asyncio + async def test_see_also_ancestor_depth_limit(self) -> None: + """seeAlso ancestor BFS respects ancestors_depth.""" + g = _base_graph() + g.add((EX.Category, RDF.type, OWL.Class)) + g.add((EX.Topic, RDF.type, OWL.Class)) + g.add((EX.Topic, RDFS.subClassOf, EX.Category)) + g.add((EX.SuperCategory, RDF.type, OWL.Class)) + g.add((EX.Category, RDFS.subClassOf, EX.SuperCategory)) + g.add((EX.Person, RDFS.seeAlso, EX.Topic)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH, ancestors_depth=1) + assert result is not None + iris = {n.iri for n in result.nodes} + # Topic found via seeAlso, Category via 1-deep ancestor BFS, but SuperCategory is beyond + assert str(EX.Topic) in iris + assert str(EX.Category) in iris + assert str(EX.SuperCategory) not in iris + + @pytest.mark.asyncio + async def test_equivalentclass_reverse_direction(self) -> None: + """equivalentClass edge uses the reverse direction when IRIs are ordered differently.""" + g = _base_graph() + # Animal < Person alphabetically, so edge goes Animal->Person when Person is first arg + # But if we add equivalentClass from Animal to Person, and Animal < Person, + # the code checks node_iri < str(equiv) — make sure both directions are exercised + g.add((EX.Animal, OWL.equivalentClass, EX.Person)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH) + assert result is not None + equiv_edges = [e for e in result.edges if e.edge_type == "equivalentClass"] + assert len(equiv_edges) >= 1 + + @pytest.mark.asyncio + async def test_disjointwith_forward_direction(self) -> None: + """disjointWith edge direction when node_iri < disjoint IRI.""" + g = _base_graph() + # Animal < Student alphabetically + g.add((EX.Animal, OWL.disjointWith, EX.Student)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH) + assert result is not None + disj_edges = [e for e in result.edges if e.edge_type == "disjointWith"] + assert len(disj_edges) >= 1 + + @pytest.mark.asyncio + async def test_non_uriref_child_skipped(self) -> None: + """BNode children in subClassOf are skipped during descendant BFS.""" + g = _base_graph() + bnode = BNode() + g.add((bnode, RDFS.subClassOf, EX.Person)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH) + assert result is not None + # BNode should not appear as a node + for node in result.nodes: + assert not node.iri.startswith("_:") + + @pytest.mark.asyncio + async def test_incoming_see_also_budget_exhausted(self) -> None: + """Incoming seeAlso referrers respect max_see_also_per_node budget.""" + g = _base_graph() + for i in range(10): + uri = URIRef(f"http://example.org/ontology#Ref{i}") + g.add((uri, RDF.type, OWL.Class)) + g.add((uri, RDFS.seeAlso, EX.Person)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph( + PROJECT_ID, + str(EX.Person), + BRANCH, + max_see_also_per_node=2, + include_see_also=True, + ) + assert result is not None + sa_edges = [e for e in result.edges if e.edge_type == "seeAlso"] + assert len(sa_edges) == 2 + + @pytest.mark.asyncio + async def test_duplicate_see_also_edge_not_counted(self) -> None: + """Duplicate seeAlso edge doesn't consume budget, leaving room for other targets.""" + g = _base_graph() + # Animal (ancestor of Person) also has seeAlso to the same target as Person, + # so when we iterate visited nodes, both Person and Animal try to add + # a seeAlso edge to EX.Shared. The second _add_edge returns False (duplicate) + # and should not consume the budget. + g.add((EX.Shared, RDF.type, OWL.Class)) + g.add((EX.Person, RDFS.seeAlso, EX.Shared)) + g.add((EX.Animal, RDFS.seeAlso, EX.Shared)) + # Add a second target only reachable from Animal — if the duplicate edge + # to Shared wrongly consumed Animal's budget, this one would be blocked. + g.add((EX.Other, RDF.type, OWL.Class)) + g.add((EX.Animal, RDFS.seeAlso, EX.Other)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph( + PROJECT_ID, str(EX.Person), BRANCH, max_see_also_per_node=2 + ) + assert result is not None + iris = {n.iri for n in result.nodes} + assert str(EX.Shared) in iris + assert str(EX.Other) in iris + + @pytest.mark.asyncio + async def test_visited_node_reused_in_descendant_diamond(self) -> None: + """_make_node returns cached node when a descendant is reachable via two paths.""" + g = Graph() + # Focus has two children A and B; both are parents of Shared + g.add((EX.Focus, RDF.type, OWL.Class)) + g.add((EX.A, RDF.type, OWL.Class)) + g.add((EX.A, RDFS.subClassOf, EX.Focus)) + g.add((EX.B, RDF.type, OWL.Class)) + g.add((EX.B, RDFS.subClassOf, EX.Focus)) + g.add((EX.Shared, RDF.type, OWL.Class)) + g.add((EX.Shared, RDFS.subClassOf, EX.A)) + g.add((EX.Shared, RDFS.subClassOf, EX.B)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph( + PROJECT_ID, str(EX.Focus), BRANCH, descendants_depth=3 + ) + assert result is not None + shared_nodes = [n for n in result.nodes if n.iri == str(EX.Shared)] + assert len(shared_nodes) == 1 + + @pytest.mark.asyncio + async def test_equivalentclass_both_directions(self) -> None: + """equivalentClass edges cover both ordering branches.""" + g = _base_graph() + # Add equivalentClass where the lexicographic ordering ensures we hit both branches. + # Animal iri < Person iri, so when iterating from Animal: node_iri < str(equiv) + # When iterating from Person with equiv=Animal: node_iri > str(equiv) → else branch + g.add((EX.Person, OWL.equivalentClass, EX.Animal)) + g.add((EX.Animal, OWL.equivalentClass, EX.Person)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph(PROJECT_ID, str(EX.Person), BRANCH) + assert result is not None + equiv_edges = [e for e in result.edges if e.edge_type == "equivalentClass"] + # Deduplication means only 1 edge regardless of direction + assert len(equiv_edges) == 1 + + @pytest.mark.asyncio + async def test_max_nodes_truncates_incoming_referrer(self) -> None: + """max_nodes reached during incoming seeAlso referrer collection.""" + g = Graph() + g.add((EX.Focus, RDF.type, OWL.Class)) + # Add many referrers pointing to Focus + for i in range(10): + uri = URIRef(f"http://example.org/ontology#Ref{i}") + g.add((uri, RDF.type, OWL.Class)) + g.add((uri, RDFS.seeAlso, EX.Focus)) + svc = _service_with_graph(g) + result = await svc.build_entity_graph( + PROJECT_ID, + str(EX.Focus), + BRANCH, + max_nodes=3, + max_see_also_per_node=20, + ) + assert result is not None + assert len(result.nodes) <= 3 + assert result.truncated is True diff --git a/tests/unit/test_graph_routes.py b/tests/unit/test_graph_routes.py new file mode 100644 index 00000000..98d5110a --- /dev/null +++ b/tests/unit/test_graph_routes.py @@ -0,0 +1,143 @@ +"""Tests for entity graph route handlers.""" + +from __future__ import annotations + +import uuid +from collections.abc import Generator +from unittest.mock import AsyncMock, MagicMock + +import pytest +from fastapi.testclient import TestClient + +from ontokit.api.routes.classes import get_ontology_service +from ontokit.api.routes.projects import get_git, get_ontology, get_service +from ontokit.main import app +from ontokit.schemas.graph import EntityGraphResponse, GraphNode + +PROJECT_ID = uuid.UUID("12345678-1234-5678-1234-567812345678") +FOCUS_IRI = "http://example.org/ontology#Person" + + +def _sample_graph_response() -> EntityGraphResponse: + return EntityGraphResponse( + focus_iri=FOCUS_IRI, + focus_label="Person", + nodes=[ + GraphNode(id=FOCUS_IRI, label="Person", iri=FOCUS_IRI, is_focus=True, node_type="focus") + ], + edges=[], + truncated=False, + total_concept_count=1, + ) + + +# --------------------------------------------------------------------------- +# classes.py — GET /api/v1/ontologies/{id}/classes/graph +# --------------------------------------------------------------------------- + + +class TestClassesGraphRoute: + @pytest.fixture + def mock_ontology_svc(self) -> Generator[AsyncMock, None, None]: + mock_svc = AsyncMock() + app.dependency_overrides[get_ontology_service] = lambda: mock_svc + try: + yield mock_svc + finally: + app.dependency_overrides.pop(get_ontology_service, None) + + def test_graph_success(self, mock_ontology_svc: AsyncMock) -> None: + mock_ontology_svc.build_entity_graph = AsyncMock(return_value=_sample_graph_response()) + client = TestClient(app, raise_server_exceptions=False) + resp = client.get( + f"/api/v1/ontologies/{PROJECT_ID}/classes/graph", + params={"class_iri": FOCUS_IRI}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["focus_iri"] == FOCUS_IRI + assert len(data["nodes"]) == 1 + + def test_graph_not_found(self, mock_ontology_svc: AsyncMock) -> None: + mock_ontology_svc.build_entity_graph = AsyncMock(return_value=None) + client = TestClient(app, raise_server_exceptions=False) + resp = client.get( + f"/api/v1/ontologies/{PROJECT_ID}/classes/graph", + params={"class_iri": "http://example.org/Missing"}, + ) + assert resp.status_code == 404 + + +# --------------------------------------------------------------------------- +# projects.py — GET /api/v1/projects/{id}/ontology/classes/graph +# --------------------------------------------------------------------------- + + +class TestProjectsGraphRoute: + @pytest.fixture + def mock_services( + self, + authed_client: tuple[TestClient, AsyncMock], + ) -> Generator[tuple[TestClient, AsyncMock, MagicMock, AsyncMock], None, None]: + client, _db = authed_client + + mock_project_svc = AsyncMock() + mock_project_svc.get = AsyncMock( + return_value=MagicMock(source_file_path="ontology.ttl", label_preferences=None) + ) + + mock_onto = AsyncMock() + mock_git = MagicMock() + mock_git.get_default_branch = MagicMock(return_value="main") + + app.dependency_overrides[get_service] = lambda: mock_project_svc + app.dependency_overrides[get_ontology] = lambda: mock_onto + app.dependency_overrides[get_git] = lambda: mock_git + try: + yield client, mock_onto, mock_git, mock_project_svc + finally: + app.dependency_overrides.pop(get_service, None) + app.dependency_overrides.pop(get_ontology, None) + app.dependency_overrides.pop(get_git, None) + + def test_graph_success( + self, + mock_services: tuple[TestClient, AsyncMock, MagicMock, AsyncMock], + ) -> None: + client, mock_onto, _git, _proj = mock_services + mock_onto.build_entity_graph = AsyncMock(return_value=_sample_graph_response()) + resp = client.get( + f"/api/v1/projects/{PROJECT_ID}/ontology/classes/graph", + params={"class_iri": FOCUS_IRI}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["focus_iri"] == FOCUS_IRI + + def test_graph_not_found( + self, + mock_services: tuple[TestClient, AsyncMock, MagicMock, AsyncMock], + ) -> None: + client, mock_onto, _git, _proj = mock_services + mock_onto.build_entity_graph = AsyncMock(return_value=None) + resp = client.get( + f"/api/v1/projects/{PROJECT_ID}/ontology/classes/graph", + params={"class_iri": "http://example.org/Missing"}, + ) + assert resp.status_code == 404 + + def test_graph_uses_default_branch( + self, + mock_services: tuple[TestClient, AsyncMock, MagicMock, AsyncMock], + ) -> None: + client, mock_onto, mock_git, _proj = mock_services + mock_onto.build_entity_graph = AsyncMock(return_value=_sample_graph_response()) + mock_git.get_default_branch = MagicMock(return_value="develop") + resp = client.get( + f"/api/v1/projects/{PROJECT_ID}/ontology/classes/graph", + params={"class_iri": FOCUS_IRI}, + ) + assert resp.status_code == 200 + mock_onto.build_entity_graph.assert_called_once() + call_kwargs = mock_onto.build_entity_graph.call_args[1] + assert call_kwargs["branch"] == "develop"