From 386ad6e22856d13916d3926e0a1a14a7d7df9192 Mon Sep 17 00:00:00 2001 From: cafzal Date: Mon, 15 Jun 2026 20:14:58 -0700 Subject: [PATCH 01/14] energy_grid_planning: add Stage 2.5 paths (transmission corridors + contingency), bump to 1.13 WIP checkpoint - code + pin + docstring. README/runbook + full-template live-run pending. Stage 2.5 composes on Stage 2 betweenness; logic live-validated standalone (421 corridors, max betweenness-load 99.833, top-substation removal reroutes). --- .../energy_grid_planning.py | 125 ++++++++++++++++++ v1/energy_grid_planning/pyproject.toml | 2 +- 2 files changed, 126 insertions(+), 1 deletion(-) diff --git a/v1/energy_grid_planning/energy_grid_planning.py b/v1/energy_grid_planning/energy_grid_planning.py index 6078cd9..17e4499 100644 --- a/v1/energy_grid_planning/energy_grid_planning.py +++ b/v1/energy_grid_planning/energy_grid_planning.py @@ -11,6 +11,10 @@ detection, and multi-metric centrality (betweenness, degree, eigenvector) on the transmission grid topology. Results are stored directly as Substation properties on the shared ontology. +- Stage 2.5 -- Paths (PREVIEW, relationalai>=1.13): enumerate generator-sub -> + DC-sub transmission corridors, rank each by Stage 2 betweenness summed along + the route (most fragile = greatest through-traffic exposure), and re-enumerate + with the highest-betweenness substation offline. Persists Substation.fragility_load. - Stage 3 -- Rules: declarative interconnection queue compliance checks (capacity, structural criticality, low-carbon mandate) that consume Stage 1 and 2 enrichments. @@ -26,6 +30,8 @@ - Stage 1: substation load forecasts with growth rates and breach detection - Stage 2: grid connectivity (WCC), community structure (Louvain), centrality ranking, and structurally critical substations + - Stage 2.5: most-fragile generator-to-DC transmission corridors (betweenness + summed along the route) + a highest-betweenness-offline contingency - Stage 3: compliance table (10 DC requests vs 3 rules: capacity, low-carbon, structural risk) - Stage 4: Pareto frontier across 5 investment levels ($200M-$600M) with @@ -649,6 +655,125 @@ def load_csv(filename): else: print("\n No DC requests target structurally critical substations.") +# -------------------------------------------------- +# Stage 2.5: Paths -- Transmission Corridors & Contingency +# PREVIEW capability; requires relationalai>=1.13. +# -------------------------------------------------- +# Composes on Stage 2's Substation.betweenness. Where Stage 2 scores a *node*, +# paths scores the *corridor* feeding each data center: enumerate generator-sub -> +# DC-sub transmission routes, rank each by total betweenness summed along its hops +# (most fragile = greatest through-traffic exposure), then re-enumerate with the +# highest-betweenness substation offline to see which DC substations reroute. + +print(f"\n{'=' * 60}") +print("STAGE 2.5: PATHS -- Transmission Corridors & Contingency") +print("=" * 60) + +# Bidirectional Substation<->Substation grid edge from active transmission lines. +Substation.connects_to = model.Relationship( + f"{Substation} connects to {Substation}", short_name="connects_to" +) +tl_fwd = TransmissionLine.ref() +s_from, s_to = Substation.ref(), Substation.ref() +model.where( + tl_fwd.from_substation(s_from), + tl_fwd.to_substation(s_to), + tl_fwd.is_active == True, +).define(s_from.connects_to(s_to)) +tl_rev = TransmissionLine.ref() +s_a, s_b = Substation.ref(), Substation.ref() +model.where( + tl_rev.from_substation(s_a), + tl_rev.to_substation(s_b), + tl_rev.is_active == True, +).define(s_b.connects_to(s_a)) + +# Typed endpoints: substations hosting a generator (source) and a DC request (sink). +GeneratorSubstation = model.Concept("GeneratorSubstation", extends=[Substation]) +gen_ref, gen_sub = Generator.ref(), Substation.ref() +model.where(gen_ref.substation(gen_sub)).define(GeneratorSubstation(gen_sub)) +DCSubstation = model.Concept("DCSubstation", extends=[Substation]) +dc_ref, dc_sub = DataCenterRequest.ref(), Substation.ref() +model.where(dc_ref.substation(dc_sub)).define(DCSubstation(dc_sub)) + +# Enumerate generator-sub -> DC-sub corridors (finite repeat; the grid is meshed/cyclic). +MAX_CORRIDOR_HOPS = 6 +corridor_src, corridor_dst = GeneratorSubstation.ref(), DCSubstation.ref() +corridor_df = model.where( + corridor := model.path( + corridor_src, Substation.connects_to.repeat(1, MAX_CORRIDOR_HOPS), corridor_dst + ).all_paths(), +).select( + corridor.alias("corridor"), + corridor.nodes["index"].alias("hop"), + Substation(corridor.nodes).id.alias("substation_id"), + Substation(corridor.nodes).name.alias("substation_name"), +).to_df() + +# Weight each corridor by Stage 2's betweenness, summed along its substations. +betw_df = model.select( + Substation.id.alias("substation_id"), + Substation.name.alias("substation_name"), + Substation.betweenness.alias("betweenness"), +).to_df() +betw_by_id = dict(zip(betw_df["substation_id"], betw_df["betweenness"].fillna(0.0))) +name_by_id = dict(zip(betw_df["substation_id"], betw_df["substation_name"])) +corridor_df["hop"] = corridor_df["hop"].astype(int) +corridor_df["betweenness"] = corridor_df["substation_id"].map(betw_by_id).fillna(0.0) + + +def most_fragile_corridor_per_dc(removed_id=None): + """Highest total-betweenness SIMPLE corridor terminating at each DC substation.""" + best = {} + for _, grp in corridor_df.groupby("corridor"): + ordered = grp.sort_values("hop") + ids = ordered["substation_id"].tolist() + if len(set(ids)) != len(ids): + continue # simple paths only (the grid is cyclic) + if removed_id is not None and removed_id in ids: + continue + load = round(float(ordered["betweenness"].sum()), 3) + dest = ids[-1] + if dest not in best or load > best[dest][1]: + best[dest] = (ordered["substation_name"].tolist(), load) + return best + + +baseline_corridors = most_fragile_corridor_per_dc() +n_corridors = corridor_df["corridor"].nunique() +print( + f"\n {n_corridors} generator-sub -> DC-sub corridors (<= {MAX_CORRIDOR_HOPS} hops, simple); " + f"most-fragile corridor for {len(baseline_corridors)} DC substation(s):" +) +for dest, (route, load) in sorted(baseline_corridors.items(), key=lambda kv: -kv[1][1])[:5]: + print(f" betweenness-load {load}: " + " -> ".join(route)) + +# Persist each DC substation's most-fragile-corridor load back to the ontology. +Substation.fragility_load = model.Property(f"{Substation} has {Float:fragility_load}") +frag_rows = pd.DataFrame( + [{"substation_id": dest, "fragility_load": load} for dest, (_, load) in baseline_corridors.items()] +) +if not frag_rows.empty: + frag_data = model.data(frag_rows) + model.define(Substation.fragility_load(frag_data.fragility_load)).where( + Substation.id == frag_data.substation_id + ) + +# Contingency: take the highest-betweenness substation offline and re-enumerate. +if betw_by_id: + top_id = max(betw_by_id, key=betw_by_id.get) + after_corridors = most_fragile_corridor_per_dc(removed_id=top_id) + # NB: `sum` is shadowed by the relationalai import, so count with len([...]). + rerouted = len( + [d for d in baseline_corridors if d in after_corridors and baseline_corridors[d][0] != after_corridors[d][0]] + ) + lost = [d for d in baseline_corridors if d not in after_corridors] + print( + f"\n Contingency -- highest-betweenness substation offline " + f"({name_by_id.get(top_id, top_id)}): {len(baseline_corridors)} -> {len(after_corridors)} " + f"DC substations served, {rerouted} reroute, {len(lost)} lose all corridors." + ) + # -------------------------------------------------- # Stage 3: Rules -- Interconnection Queue Compliance # -------------------------------------------------- diff --git a/v1/energy_grid_planning/pyproject.toml b/v1/energy_grid_planning/pyproject.toml index f2f4d3f..cc9fcf6 100644 --- a/v1/energy_grid_planning/pyproject.toml +++ b/v1/energy_grid_planning/pyproject.toml @@ -9,7 +9,7 @@ description = "RelationalAI template: energy_grid_planning (PyRel v1)" readme = "README.md" requires-python = ">=3.10" dependencies = [ - "relationalai==1.11.0", + "relationalai==1.13.0", "pandas>=2.0", ] From 4f70f2725a7d8c31177d66356136e531d529f8fc Mon Sep 17 00:00:00 2001 From: cafzal Date: Mon, 15 Jun 2026 20:28:27 -0700 Subject: [PATCH 02/14] energy_grid_planning: README + runbook for Stage 2.5 paths - How it works: Stage 2.5 subsection (transmission corridors + contingency) with verbatim snippet; chain-composition bullet for Substation.fragility_load. - Runbook: step 5b 'Trace fragile transmission corridors' (question-shaped, betweenness anchored by structural test; 421 corridors, load 99.833, DFW contingency reroutes). --- v1/energy_grid_planning/README.md | 22 +++++++++++++++++++++- v1/energy_grid_planning/runbook.md | 12 ++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/v1/energy_grid_planning/README.md b/v1/energy_grid_planning/README.md index 4381095..f44227c 100644 --- a/v1/energy_grid_planning/README.md +++ b/v1/energy_grid_planning/README.md @@ -35,7 +35,8 @@ This template uses RelationalAI's **predictive reasoning**, **graph analysis**, Each stage enriches the shared ontology, and downstream stages consume those enrichments -- this is the **accretive ontology enrichment** pattern. No Python dicts or DataFrames carry state between stages; the ontology is the single source of truth: - **Stage 1 writes** `Substation.predicted_load` -- consumed by Stage 3's capacity rule AND Stage 4's capacity constraint. Both downstream reasoners see the same forecasted headroom. -- **Stage 2 writes** `Substation.betweenness`, `Substation.grid_community`, `Substation.is_structurally_critical` -- consumed by Stage 3's structural risk rule (Rule 2). +- **Stage 2 writes** `Substation.betweenness`, `Substation.grid_community`, `Substation.is_structurally_critical` -- consumed by Stage 3's structural risk rule (Rule 2) and by Stage 2.5's corridor ranking. +- **Stage 2.5 writes** `Substation.fragility_load` (PREVIEW) -- the most-fragile generator-to-DC corridor's betweenness load per data-center substation, consuming Stage 2's betweenness along enumerated routes. - **Stage 3 writes** `DataCenterRequest.fails_capacity`, `.fails_structural`, `.fails_low_carbon`, `.is_compliant` -- queryable compliance flags that document why each request was flagged. - **Stage 4 writes** `DataCenterRequest.x_approve` and `SubstationUpgrade.x_upgrade` per `InvestmentLevel` -- queried from the ontology via `model.select()`, not parsed from solver output. @@ -285,6 +286,25 @@ community = grid_graph.louvain() betweenness = grid_graph.betweenness_centrality() ``` +### Stage 2.5: Paths -- Transmission Corridors & Contingency + +> PREVIEW capability; requires `relationalai>=1.13`. + +Where Stage 2 scores a *substation*, the **Graph** paths capability scores the *corridor* feeding each data center. It derives a bidirectional substation-to-substation edge from active transmission lines, enumerates generator-substation to DC-substation routes, and ranks each by the Stage 2 betweenness summed along its hops — the most fragile corridor is the one carrying the greatest through-traffic exposure. A contingency pass removes the highest-betweenness substation and re-enumerates to show which data centers reroute. The most-fragile load is persisted as `Substation.fragility_load`. + +```python +corridor_df = model.where( + corridor := model.path( + corridor_src, Substation.connects_to.repeat(1, MAX_CORRIDOR_HOPS), corridor_dst + ).all_paths(), +).select( + corridor.alias("corridor"), + corridor.nodes["index"].alias("hop"), + Substation(corridor.nodes).id.alias("substation_id"), + Substation(corridor.nodes).name.alias("substation_name"), +).to_df() +``` + ### Stage 3: Rules -- Interconnection Queue Compliance Three declarative rules (RAI Relationships) consume upstream outputs: diff --git a/v1/energy_grid_planning/runbook.md b/v1/energy_grid_planning/runbook.md index 3306d5c..19b0ceb 100644 --- a/v1/energy_grid_planning/runbook.md +++ b/v1/energy_grid_planning/runbook.md @@ -98,6 +98,18 @@ Plan routing sub-questions to predictive, graph, rules, and prescriptive reasone 1 connected component, 3 Louvain communities (North Texas, West Texas, Gulf Coast); DFW, Houston, San Antonio flagged `is_structurally_critical`; 7 of 10 DC requests target critical nodes. +### 5b. Trace fragile transmission corridors (PREVIEW, requires `relationalai>=1.13`) + +**Prompt** + +``` +/rai-graph-analysis For each data-center substation, what is the most fragile transmission corridor from a generator substation — the route running through the greatest total structural criticality across its substations, where a substation's criticality reflects how many power-flow paths route through it? Consider corridors up to about six hops that don't revisit a substation. Then, if the single most critical substation goes offline, which data-center substations reroute and which lose every corridor? +``` + +**Response** + +421 generator-substation to DC-substation corridors. The most fragile carries a betweenness-load of 99.833, routing through the Dallas-Fort Worth / Abilene Central / Houston Ship Channel hubs. Taking the top substation (Dallas-Fort Worth) offline reroutes the other DC substations' corridors; the meshed grid stays connected (no full isolation). Persists `Substation.fragility_load`. + ### 6. Screen DC requests **Prompt** From 59cbaa7a7b05e72a519da882260906488ef9770f Mon Sep 17 00:00:00 2001 From: cafzal Date: Mon, 15 Jun 2026 20:30:54 -0700 Subject: [PATCH 03/14] bom-reachability: add assembly-path enumeration (PREVIEW) + bump to 1.13 Derive SKU.feeds (input_sku -> output_sku) from BillOfMaterials, enumerate assembly paths on the BOM DAG, add a maximal-paths view, persist SKU.assembly_depth. Mirrors the live-validated Q4 pattern (18 paths -> finished goods). py_compile + ruff green; full-template live-run is the pre-merge gate. --- v1/bom-reachability/bom_reachability.py | 130 +++++++++++++++++++++++- v1/bom-reachability/pyproject.toml | 2 +- 2 files changed, 128 insertions(+), 4 deletions(-) diff --git a/v1/bom-reachability/bom_reachability.py b/v1/bom-reachability/bom_reachability.py index 7cac08d..f9be0af 100644 --- a/v1/bom-reachability/bom_reachability.py +++ b/v1/bom-reachability/bom_reachability.py @@ -10,18 +10,29 @@ Algorithms: reachable(full=True) for transitive dependency tracing, betweenness_centrality() for identifying structural bottlenecks. +Assembly path enumeration (PREVIEW, requires relationalai>=1.13): enumerate the + bottom-up assembly chains that build each finished good. Derives a binary + SKU->SKU "feeds into" edge from the BillOfMaterials intermediary (input feeds + output), enumerates every assembly path with model.path(...).all_paths(), + filters to the maximal (longest non-extendable) chains so sub-paths are + suppressed, and persists each finished good's longest assembly depth back onto + the SKU ontology. + Run: `python bom_reachability.py` Output: - Prints transitive dependency lists per finished product and a betweenness - centrality ranking that flags structural bottleneck components. + Prints transitive dependency lists per finished product, a betweenness + centrality ranking that flags structural bottleneck components, and the + maximal raw-material -> finished-good assembly chains with per-finished-good + assembly depth. """ from pathlib import Path +import pandas as pd from pandas import read_csv -from relationalai.semantics import Float, Model, String, where +from relationalai.semantics import Float, Integer, Model, String, where from relationalai.semantics.reasoners.graph import Graph model = Model("bom_reachability") @@ -162,3 +173,116 @@ top = bottlenecks.iloc[0] print(f"\nTop bottleneck: {top['sku_name']} (betweenness={top['betweenness']:.4f})") print(" Sits on the most dependency paths -- disruption here affects the most product lines.") + +# -------------------------------------------------- +# Assembly path enumeration +# PREVIEW capability; requires relationalai>=1.13. +# -------------------------------------------------- +# Where betweenness scores a single *node*, this enumerates the full *chains* +# that build each finished good: every bottom-up assembly path from a raw +# material up through its components to the finished good. We derive a binary +# SKU->SKU "feeds into" edge from the BillOfMaterials intermediary (the input +# SKU feeds the output SKU), enumerate all such paths, then keep only the +# maximal (longest, non-extendable) chains so that sub-paths are suppressed. + +print("\n=== Assembly Path Enumeration (PREVIEW) ===") + +# Binary SKU->SKU edge: input_sku "feeds into" output_sku. This is the +# build-direction (bottom-up) reverse of the depends-on graph edge above. +SKU.feeds = model.Relationship(f"{SKU} feeds into {SKU}", short_name="feeds") +bom_ref = BillOfMaterials.ref() +sku_in, sku_out = SKU.ref(), SKU.ref() +model.where( + bom_ref.input_sku(sku_in), + bom_ref.output_sku(sku_out), +).define(sku_in.feeds(sku_out)) + +# Enumerate every assembly chain. The BOM is a DAG (raw materials -> components +# -> finished goods, never back), so .all_paths() already yields simple paths -- +# there is no cycle risk, and the repeat bound only needs to cover the BOM depth. +# Deepest chain here is raw_material -> component -> finished_good (2 hops); we +# allow extra headroom so deeper BOMs enumerate fully. +MAX_ASSEMBLY_HOPS = 4 +p = model.path(SKU.feeds.repeat(1, MAX_ASSEMBLY_HOPS)).all_paths() +assembly_df = ( + model.where(p) + .select( + p.alias("path"), + p.nodes["index"].alias("step"), + SKU(p.nodes).id.alias("sku_id"), + SKU(p.nodes).name.alias("sku_name"), + ) + .to_df() +) + +# Reassemble each path in pandas: group on the path id, order by step index. +assembly_df["step"] = assembly_df["step"].astype(int) +paths = [] +for path_id, grp in assembly_df.groupby("path"): + ordered = grp.sort_values("step") + ids = ordered["sku_id"].tolist() + names = ordered["sku_name"].tolist() + paths.append({"ids": ids, "names": names, "length": len(ids) - 1}) + +print(f"\n Enumerated {len(paths)} assembly path(s) (<= {MAX_ASSEMBLY_HOPS} hops).") + +# (a) All assembly paths, longest first. +print("\n All assembly chains (feeds-into order):") +for path in sorted(paths, key=lambda x: -x["length"]): + print(f" [{path['length']} hop] " + " -> ".join(path["names"])) + +# (b) Maximal paths: keep only the longest, non-extendable chains -- a chain +# that is NOT a contiguous sub-sequence (prefix or suffix) of any longer chain. +# Suppressing these sub-paths leaves just the end-to-end assembly routes. +def _is_contiguous_subsequence(short, long): + """True if `short` appears as a contiguous run inside `long`.""" + if len(short) >= len(long): + return False + return any(long[i:i + len(short)] == short for i in range(len(long) - len(short) + 1)) + + +maximal = [ + path + for path in paths + if not any( + other is not path and _is_contiguous_subsequence(path["ids"], other["ids"]) + for other in paths + ) +] + +print(f"\n Maximal assembly chains ({len(maximal)} of {len(paths)}, sub-paths suppressed):") +for path in sorted(maximal, key=lambda x: -x["length"]): + print(f" [{path['length']} hop] " + " -> ".join(path["names"])) + +# Persist each finished good's longest assembly depth back onto the SKU ontology. +# The terminal SKU of a maximal chain is the thing it builds; the deepest chain +# ending there is that finished good's assembly depth. +SKU.assembly_depth = model.Property(f"{SKU} has assembly depth {Integer:assembly_depth}") +depth_by_sku = {} +for path in maximal: + terminal = path["ids"][-1] + depth_by_sku[terminal] = max(depth_by_sku.get(terminal, 0), path["length"]) + +if depth_by_sku: + depth_rows = pd.DataFrame( + [{"sku_id": sku_id, "assembly_depth": depth} for sku_id, depth in depth_by_sku.items()] + ) + depth_data = model.data(depth_rows) + model.define(SKU.assembly_depth(depth_data.assembly_depth)).where( + SKU.id == depth_data.sku_id + ) + + print("\n Assembly depth persisted onto SKU (longest chain terminating at each):") + depth_df = ( + where(SKU.assembly_depth > 0) + .select( + SKU.id.alias("sku_id"), + SKU.name.alias("sku_name"), + SKU.type.alias("type"), + SKU.assembly_depth.alias("assembly_depth"), + ) + .to_df() + .sort_values("assembly_depth", ascending=False) + .reset_index(drop=True) + ) + print(depth_df.to_string(index=False)) diff --git a/v1/bom-reachability/pyproject.toml b/v1/bom-reachability/pyproject.toml index dc06447..0196ef2 100644 --- a/v1/bom-reachability/pyproject.toml +++ b/v1/bom-reachability/pyproject.toml @@ -9,7 +9,7 @@ description = "RelationalAI template: bill of materials dependency tracing with readme = "README.md" requires-python = ">=3.10" dependencies = [ - "relationalai==1.11.0", + "relationalai==1.13.0", "pandas>=2.0.0", ] From 58f08779423a146830b571dc0a9fa82b19b6fb64 Mon Sep 17 00:00:00 2001 From: cafzal Date: Tue, 16 Jun 2026 10:09:38 -0700 Subject: [PATCH 04/14] telco_network_recovery: add Stage 3.5 call-path enumeration (PREVIEW) Arity-3 Subscriber.calls_via edge (caller via routed_through tower -> callee) from CallDetailRecord; enumerate call paths from the top-PageRank hub (scoped; the call graph is large/cyclic), recover the routing tower per hop via relationship_fields, rank by PageRank summed along the route, persist Subscriber.top_call_path_influence. Refactored to the live-validated explicit-src + pandas-field_index form (matches telco_validate.py: 6376 simple <=3-hop paths, 120 towers). py_compile + ruff green; pin already 1.13. Full-template live-run is the pre-merge gate. --- .../telco_network_recovery.py | 172 ++++++++++++++++++ 1 file changed, 172 insertions(+) diff --git a/v1/telco_network_recovery/telco_network_recovery.py b/v1/telco_network_recovery/telco_network_recovery.py index 49197a1..2357e56 100644 --- a/v1/telco_network_recovery/telco_network_recovery.py +++ b/v1/telco_network_recovery/telco_network_recovery.py @@ -30,6 +30,13 @@ of ACTIVE subscribers whose calls route through the tower; PageRank-weighted impact is exposed alongside as a secondary network-effect signal. +- Stage 3.5 -- Paths (PREVIEW, relationalai>=1.13): enumerate caller + -> callee call paths (<=3 hops, simple) from the highest-PageRank + subscriber over an arity-3 `{Subscriber} via {CellTower} calls + {Subscriber}` edge, recovering the routing tower on each hop via + `relationship_fields`. Ranks each scoped subscriber's routes by + Stage 3 PageRank summed along the chain and persists the top route's + influence load as `Subscriber.top_call_path_influence`. - Stage 4 -- Prescriptive: tower-upgrade MIP. Decision variable `TowerUpgradeOption.selected` is binary (one of three tiers per tower). Constraints: at most one tier per tower, total cost <= @@ -805,6 +812,171 @@ class SubscriberStatus(model.Enum): "weighted_pagerank shown as secondary):") print(blast_df.to_string(index=False)) +# -------------------------------------------------- +# Stage 3.5: Paths -- Call-path enumeration through critical towers +# PREVIEW capability; requires relationalai>=1.13. +# -------------------------------------------------- +# Composes on Stage 3's Subscriber.influence_score (PageRank) and Stage 2's +# CellTower.is_critical_restore. Where PageRank scores a *node*, paths scores +# the *call route*: enumerate caller -> ... -> callee chains, recover the +# routing tower on each hop, and rank each scoped subscriber's routes by total +# influence summed along the chain. The routing tower is the auxiliary middle +# field of an arity-3 edge, so each enumerated path also tells us which towers +# the influence flows through -- the join between the graph and rules stages. + +print(f"\n{'=' * 60}") +print("STAGE 3.5: PATHS -- Call-path enumeration through critical towers") +print("=" * 60) + +# Arity-3 call edge: {Subscriber:caller} via {CellTower:tower} calls {Subscriber:callee}. +# First/last fields (caller/callee) are the path endpoints; the tower is the +# auxiliary middle field (field_index 1), recoverable per hop via +# relationship_fields. Derived from CallDetailRecord's caller/callee/routed_through. +Subscriber.calls_via = model.Relationship( + f"{Subscriber:caller} via {CellTower:tower} calls {Subscriber:callee}", + short_name="calls_via", +) +cdr_edge = CallDetailRecord.ref() +caller_sub, callee_sub, via_tower = Subscriber.ref(), Subscriber.ref(), CellTower.ref() +model.where( + cdr_edge.caller(caller_sub), + cdr_edge.callee(callee_sub), + cdr_edge.routed_through(via_tower), +).define(caller_sub.calls_via(via_tower, callee_sub)) + +# Scope the enumeration: the full call graph is large and cyclic, so we anchor +# on a deterministic seed -- the single highest-PageRank subscriber (max +# influence_score, ties broken by id). From that hub we enumerate call paths up +# to 3 hops and keep simple paths only. `path(C.r.repeat(1, 3))` constrains only +# the *source* to type Subscriber; we pin the hub as the start node via .where(). +SEED_HUB_COUNT = 1 # deterministic single-hub scope +PATH_MAX_HOPS = 3 + +influence_df = model.select( + Subscriber.id.alias("sub_id"), + Subscriber.influence_score.alias("influence"), +).to_df() +influence_df["influence"] = influence_df["influence"].astype(float) +seed_hub_ids = ( + influence_df.sort_values(["influence", "sub_id"], ascending=[False, True]) + .head(SEED_HUB_COUNT)["sub_id"] + .tolist() +) +influence_by_id = dict(zip(influence_df["sub_id"], influence_df["influence"])) + +# Per-subscriber influence load of the top-ranked call path, persisted below. +Subscriber.top_call_path_influence = model.Property( + f"{Subscriber} has {Float:top_call_path_influence}" +) +tower_name_df = model.select( + CellTower.id.alias("tower_id"), + CellTower.name.alias("tower_name"), +).to_df() +tower_name_by_id = dict(zip(tower_name_df["tower_id"], tower_name_df["tower_name"])) + +frag_rows = [] +total_simple_paths = 0 +towers_recovered = set() +top_route_overall = None # (influence_load, [sub ids], [tower ids]) + +for hub_id in seed_hub_ids: + # Pin the start node via an explicit src argument to path() + an outer id + # filter (the live-validated form), not a p.nodes(0, ref) post-filter. + seed = Subscriber.ref() + p_nodes = model.where( + seed.id == hub_id, + p := model.path( + seed, Subscriber.calls_via.repeat(1, PATH_MAX_HOPS) + ).all_paths(), + ).select( + p.alias("path_id"), + p.nodes["index"].alias("step"), + Subscriber(p.nodes).id.alias("sub_id"), + ).to_df() + + # Recover the routing tower on each hop: field_index 1 is the CellTower in + # the arity-3 edge (caller=0, tower=1, callee=2). Project all edge fields and + # filter to field_index 1 in pandas (the live-validated form) rather than in + # the where clause. + seed_h = Subscriber.ref() + p_hops = model.where( + seed_h.id == hub_id, + p := model.path( + seed_h, Subscriber.calls_via.repeat(1, PATH_MAX_HOPS) + ).all_paths(), + ).select( + p.alias("path_id"), + p.relationship_fields["index"].alias("hop"), + p.relationship_fields["field_index"].alias("fidx"), + CellTower(p.relationship_fields["field"]).id.alias("tower_id"), + ).to_df() + if not p_hops.empty: + p_hops = p_hops[p_hops["fidx"].astype(int) == 1] + + if p_nodes.empty: + continue + p_nodes["step"] = p_nodes["step"].astype(int) + + # Reassemble each path in pandas: order nodes by step, keep SIMPLE paths + # only (the call graph is cyclic), then weight by PageRank summed along the + # subscribers on the route. + tower_by_path = {} + if not p_hops.empty: + p_hops["hop"] = p_hops["hop"].astype(int) + for path_id, hop_grp in p_hops.groupby("path_id"): + towers = hop_grp.sort_values("hop")["tower_id"].tolist() + tower_by_path[path_id] = towers + + hub_best_load = None + for path_id, grp in p_nodes.groupby("path_id"): + ordered = grp.sort_values("step") + sub_ids = ordered["sub_id"].tolist() + if len(set(sub_ids)) != len(sub_ids): + continue # simple paths only (the call graph is cyclic) + total_simple_paths += 1 + route_towers = tower_by_path.get(path_id, []) + towers_recovered.update(route_towers) + # PageRank summed along the route (pandas .sum(); `sum` may be shadowed + # by relationalai.semantics in these templates -- never call it on + # Python data). + influence_load = round( + float(pd.Series([influence_by_id.get(s, 0.0) for s in sub_ids]).sum()), 6 + ) + if hub_best_load is None or influence_load > hub_best_load: + hub_best_load = influence_load + if top_route_overall is None or influence_load > top_route_overall[0]: + top_route_overall = (influence_load, sub_ids, route_towers) + + # Persist this hub's best (max influence-load) simple call path. + if hub_best_load is not None: + frag_rows.append( + {"sub_id": hub_id, "top_call_path_influence": hub_best_load} + ) + +# Persist the top-route influence-load as a Subscriber property on the ontology. +if frag_rows: + frag_df = pd.DataFrame(frag_rows) + frag_data = model.data(frag_df) + model.define( + Subscriber.top_call_path_influence(frag_data.top_call_path_influence) + ).where(Subscriber.id == frag_data.sub_id) + +# Concise summary: counts, the top influence-weighted call path, towers recovered. +print( + f"\n Seed scope: {len(seed_hub_ids)} highest-PageRank hub(s) " + f"{seed_hub_ids}; enumeration <= {PATH_MAX_HOPS} hops, simple paths only." +) +print(f" Simple call paths enumerated from the seed hub(s): {total_simple_paths}") +print(f" Distinct routing towers recovered across those paths: {len(towers_recovered)}") +if top_route_overall is not None: + _load, _subs, _towers = top_route_overall + route_str = " -> ".join(_subs) + tower_str = ( + " via towers [" + ", ".join(tower_name_by_id.get(t, t) for t in _towers) + "]" + if _towers else "" + ) + print(f" Top influence-weighted call path (PageRank sum {_load}): {route_str}{tower_str}") + # -------------------------------------------------- # Stage 4: Prescriptive -- tower upgrade MIP # -------------------------------------------------- From ff574561b896b32117125158243522c31a53fbb0 Mon Sep 17 00:00:00 2001 From: cafzal Date: Tue, 16 Jun 2026 10:11:00 -0700 Subject: [PATCH 05/14] it-dependency-mapping: NEW paths template (downstream dependency paths) Single-reasoner (Graph/paths) template, Technology & Telecom domain. Feature .contributes_to self-relationship (acyclic DAG); model.path(Feature.contributes_to .repeat(1,N)).all_paths() enumerates downstream dependency paths, reduces to maximal chains, persists Feature.max_downstream_depth. 14 features / 15 edges; 46 paths -> 6 maximal, longest 5 hops. Uses the live-validated single-relationship path form. py_compile + ruff green; pin 1.13.0. Full-template live-run is the pre-merge gate. --- v1/it-dependency-mapping/README.md | 252 ++++++++++++++++++ .../data/dependencies.csv | 16 ++ v1/it-dependency-mapping/data/features.csv | 15 ++ .../it_dependency_mapping.py | 232 ++++++++++++++++ v1/it-dependency-mapping/pyproject.toml | 17 ++ 5 files changed, 532 insertions(+) create mode 100644 v1/it-dependency-mapping/README.md create mode 100644 v1/it-dependency-mapping/data/dependencies.csv create mode 100644 v1/it-dependency-mapping/data/features.csv create mode 100644 v1/it-dependency-mapping/it_dependency_mapping.py create mode 100644 v1/it-dependency-mapping/pyproject.toml diff --git a/v1/it-dependency-mapping/README.md b/v1/it-dependency-mapping/README.md new file mode 100644 index 0000000..59031ef --- /dev/null +++ b/v1/it-dependency-mapping/README.md @@ -0,0 +1,252 @@ +--- +title: "IT Dependency Mapping" +description: "Map the downstream dependency structure of a software and data-pipeline estate by enumerating variable-length traversal paths over an acyclic dependency graph, then surface the longest end-to-end chains and the owners along them." +experience_level: intermediate +industry: "Technology & Telecom" +featured: false +reasoning_types: + - Graph +tags: + - graph-analytics + - paths + - variable-length-traversal + - dependency-mapping + - data-lineage + - technology +sidebar: + order: 6 +--- + +## What this template is for + +Modern software estates are webs of dependencies: raw data sources feed ingestion pipelines, pipelines feed feature jobs, jobs back services and APIs, and services power dashboards. When someone proposes changing a pipeline -- or one goes down at 2am -- the question is always the same: what is downstream of this, and how far does the blast radius reach? Direct dependencies are easy to list; the full transitive chains are not. + +This template demonstrates **Graph** reasoning -- specifically variable-length path traversal -- over a dependency DAG: + +1. **Path enumeration** (`model.path(Feature.contributes_to.repeat(1, N)).all_paths()`) -- Walk every downstream dependency path of every length. Because the estate is acyclic, each enumerated path is a simple chain. +2. **Maximal-chain reduction** -- Collapse the full path set down to the longest non-extendable chains, dropping the shorter sub-chains contained inside them, so the end-to-end propagation paths stand out. + +## Who this is for + +- **Intermediate users** who want to learn variable-length path traversal on a directed acyclic graph +- **Platform and data engineers** assessing the downstream impact of a pipeline change or outage +- **Reliability owners** who need to know which chains, and which people, sit between a root component and its consumers + +## What you'll build + +- Load a 14-feature, 15-edge dependency estate from CSV (raw sources, pipelines, feature jobs, services, dashboards) +- Define a `contributes_to` self-relationship forming an acyclic dependency DAG +- Enumerate every downstream dependency path with `model.path(...).all_paths()` +- Report per-feature path counts and longest downstream depth +- Reduce the path set to its maximal chains (longest non-extendable dependency chains) +- Persist each feature's longest downstream depth back to the ontology as `Feature.max_downstream_depth` +- Trace the owners along the single longest chain -- the worst-case change/incident blast radius + +## What's included + +- **Self-contained script**: `it_dependency_mapping.py` -- Runs the full analysis end-to-end +- **Data**: `data/features.csv` (14 features across tiers) and `data/dependencies.csv` (15 dependency edges) + +## Prerequisites + +- Python >= 3.10 +- A Snowflake account that has the RAI Native App installed. +- A Snowflake user with permissions to access the RAI Native App. +- `relationalai >= 1.13` -- the path-traversal API is a preview capability introduced in 1.13. + +## Quickstart + +1. Download and extract this template: + + ```bash + curl -O https://docs.relational.ai/templates/zips/v1/it-dependency-mapping.zip + unzip it-dependency-mapping.zip + cd it-dependency-mapping + ``` + + > [!TIP] + > You can also download the template ZIP using the "Download ZIP" button at the top of this page. + +2. **Create and activate a virtual environment** + + ```bash + python -m venv .venv + source .venv/bin/activate + python -m pip install -U pip + ``` + +3. **Install dependencies** + + ```bash + python -m pip install . + ``` + +4. **Configure Snowflake connection and RAI profile** + + ```bash + rai init + ``` + +5. **Run the template** + + ```bash + python it_dependency_mapping.py + ``` + +## Template structure + +```text +it-dependency-mapping/ +├── it_dependency_mapping.py # Self-contained analysis script +├── pyproject.toml # Dependencies and project metadata +├── README.md # This file +└── data/ + ├── features.csv # 14 features (id, name, owner, deploy_tier) + └── dependencies.csv # 15 dependency edges (from_feature, to_feature) +``` + +## How it works + +```text +CSV files --> Define Feature + contributes_to --> Enumerate downstream paths --> Reduce to maximal chains --> Trace owners along longest chain +``` + +### 1. Load Ontology + +A `Feature` is any node in the estate -- a raw source, pipeline, feature job, service, or dashboard. The `contributes_to` self-relationship records that an upstream feature feeds a downstream one, forming the dependency DAG: + +```python +Feature = model.Concept("Feature", identify_by={"id": String}) +Feature.name = model.Property(f"{Feature} has {String:name}") +Feature.owner = model.Property(f"{Feature} owned by {String:owner}") +Feature.deploy_tier = model.Property(f"{Feature} has deploy tier {String:deploy_tier}") + +Feature.contributes_to = model.Relationship( + f"{Feature} contributes to {Feature}", short_name="contributes_to" +) +``` + +### 2. Enumerate Downstream Paths + +`model.path(Feature.contributes_to.repeat(1, MAX_DEPTH))` describes a variable-length traversal of 1 to `MAX_DEPTH` `contributes_to` edges. `all_paths()` enumerates every such path. Each result is a `PathTraversal`: `p.length` is its hop count and `p.nodes` is the ordered sequence of features it visits: + +```python +p_pattern = model.path(Feature.contributes_to.repeat(1, MAX_DEPTH)) +paths_df = ( + model.where(p := p_pattern.all_paths()) + .select( + p.alias("path_id"), + p.length.alias("hops"), + p.nodes["index"].alias("step"), + Feature(p.nodes).id.alias("feature_id"), + Feature(p.nodes).name.alias("feature_name"), + ) + .to_df() +) +``` + +Each path arrives as one row per visited node. Grouping on the path-id column and ordering by node index reassembles the ordered chain: + +```python +chains = ( + paths_df.sort_values(["path_id", "step"]) + .groupby("path_id") + .agg( + hops=("hops", "first"), + node_ids=("feature_id", lambda s: tuple(s)), + chain=("feature_name", lambda s: " -> ".join(s)), + ) + .reset_index() +) +``` + +### 3. Persist Longest Downstream Depth + +The longest path starting at each feature is its downstream depth. Writing it back as a first-class property lets a downstream query rank features by reach without re-enumerating paths: + +```python +Feature.max_downstream_depth = model.Property( + f"{Feature} has {Integer:max_downstream_depth}" +) +depth_data = model.data(depth_rows) +model.define( + Feature.max_downstream_depth(depth_data.max_downstream_depth) +).where(Feature.id == depth_data.feature_id) +``` + +### 4. Reduce to Maximal Chains + +The full path set contains every sub-chain. A path is *maximal* when its node sequence is not a contiguous sub-chain of any other enumerated path -- it cannot be extended upstream or downstream. Filtering to maximal chains leaves only the end-to-end propagation paths: + +```python +maximal = chains[ + chains["node_ids"].apply( + lambda seq: not any( + is_sub_chain(seq, other) for other in all_sequences if other != seq + ) + ) +].copy() +``` + +The single longest maximal chain is the worst-case blast radius: joining owner metadata onto its features shows every person a change at the root would have to clear before it reaches the final consumer. + +## Expected output + +The deepest chain runs five hops from a raw source all the way to a downstream dashboard, crossing several owners: + +```text +=== Maximal dependency chains (6 of 46 paths) === + 5 hops: Clickstream Ingest -> Events Enrichment Pipeline -> Session Feature Job -> Churn Feature Store -> Churn Scoring API -> Retention Dashboard + 4 hops: CRM Sync -> Customer 360 Build -> Churn Feature Store -> Churn Scoring API -> Retention Dashboard + 4 hops: Clickstream Ingest -> Events Enrichment Pipeline -> Session Feature Job -> Recommendation Service -> Retention Dashboard + 4 hops: Transaction CDC Stream -> Customer 360 Build -> Churn Feature Store -> Churn Scoring API -> Retention Dashboard + 4 hops: Transaction CDC Stream -> Ledger Normalizer -> Revenue Rollup -> Billing API -> Executive Revenue Dashboard + 3 hops: Transaction CDC Stream -> Ledger Normalizer -> Revenue Rollup -> Executive Revenue Dashboard + +=== Longest dependency chain: 5 hops === + Chain: Clickstream Ingest -> Events Enrichment Pipeline -> Session Feature Job -> Churn Feature Store -> Churn Scoring API -> Retention Dashboard + Spans 6 features and 5 owners: + - Clickstream Ingest owner=Maya Chen tier=critical + - Events Enrichment Pipeline owner=Sofia Rossi tier=high + - Session Feature Job owner=Liang Wu tier=high + - Churn Feature Store owner=Liang Wu tier=high + - Churn Scoring API owner=Priya Nair tier=high + - Retention Dashboard owner=Hana Kim tier=standard + + A change at 'Clickstream Ingest' propagates through 5 downstream feature(s) before reaching 'Retention Dashboard'. +``` + +## Customize this template + +**Use your own data:** +- Replace the CSVs in `data/` with your own features and dependency edges, keeping the same column names. +- `deploy_tier` is illustrative metadata; swap in your own (criticality, environment, SLA class) and group the output by it. + +**Extend the analysis:** +- Raise `MAX_DEPTH` if your estate has longer chains than the sample. +- Filter the enumerated paths to those that end at a `critical`-tier feature to find the chains that matter most. +- Add edge attributes (latency, freshness SLA) and sum them along each path to rank chains by cumulative risk, not just length. + +## Troubleshooting + +
+ Why does model.path(...) raise an AttributeError or ImportError? + +- The path-traversal API is a preview capability introduced in `relationalai` 1.13. Confirm your installed version with `python -c "import relationalai; print(relationalai.__version__)"` and upgrade if it is older. + +
+ +
+ Why does path enumeration hang or return far more paths than expected? + +- `all_paths()` enumerates walks. On an acyclic estate every walk is a simple chain, but if you add an edge that introduces a cycle the enumeration can explode. Keep `contributes_to` acyclic, and use `MAX_DEPTH` to bound traversal. + +
+ +
+ Why does authentication/configuration fail? + +- Run `rai init` to create/update `raiconfig.toml`. +- If you have multiple profiles, set `RAI_PROFILE` or switch profiles in your config. + +
diff --git a/v1/it-dependency-mapping/data/dependencies.csv b/v1/it-dependency-mapping/data/dependencies.csv new file mode 100644 index 0000000..ff5e4f7 --- /dev/null +++ b/v1/it-dependency-mapping/data/dependencies.csv @@ -0,0 +1,16 @@ +from_feature,to_feature +clickstream-ingest,events-enrichment +txn-cdc-stream,ledger-normalizer +txn-cdc-stream,customer-360-build +crm-sync,customer-360-build +events-enrichment,session-features +ledger-normalizer,revenue-rollup +session-features,churn-feature-store +customer-360-build,churn-feature-store +session-features,recommendation-svc +churn-feature-store,churn-scoring-api +revenue-rollup,billing-api +revenue-rollup,exec-revenue-dashboard +billing-api,exec-revenue-dashboard +churn-scoring-api,retention-dashboard +recommendation-svc,retention-dashboard diff --git a/v1/it-dependency-mapping/data/features.csv b/v1/it-dependency-mapping/data/features.csv new file mode 100644 index 0000000..040ddb1 --- /dev/null +++ b/v1/it-dependency-mapping/data/features.csv @@ -0,0 +1,15 @@ +id,name,owner,deploy_tier +clickstream-ingest,Clickstream Ingest,Maya Chen,critical +txn-cdc-stream,Transaction CDC Stream,Maya Chen,critical +crm-sync,CRM Sync,Raj Patel,standard +events-enrichment,Events Enrichment Pipeline,Sofia Rossi,high +ledger-normalizer,Ledger Normalizer,Sofia Rossi,critical +customer-360-build,Customer 360 Build,Raj Patel,high +session-features,Session Feature Job,Liang Wu,high +revenue-rollup,Revenue Rollup,Sofia Rossi,critical +churn-feature-store,Churn Feature Store,Liang Wu,high +recommendation-svc,Recommendation Service,Liang Wu,high +churn-scoring-api,Churn Scoring API,Priya Nair,high +billing-api,Billing API,Tom Becker,critical +exec-revenue-dashboard,Executive Revenue Dashboard,Hana Kim,standard +retention-dashboard,Retention Dashboard,Hana Kim,standard diff --git a/v1/it-dependency-mapping/it_dependency_mapping.py b/v1/it-dependency-mapping/it_dependency_mapping.py new file mode 100644 index 0000000..e1d2e60 --- /dev/null +++ b/v1/it-dependency-mapping/it_dependency_mapping.py @@ -0,0 +1,232 @@ +"""IT dependency mapping (graph paths) template. + +Maps the downstream dependency structure of a software / data-pipeline estate +by enumerating variable-length traversal paths over an acyclic dependency DAG: + +- Loads Feature nodes (services, pipelines, jobs, dashboards) and a + contributes_to self-relationship (feeds / contributes to) from CSV. +- Enumerates every downstream dependency path with + ``model.path(Feature.contributes_to.repeat(1, N)).all_paths()`` -- because the + estate is acyclic, every enumerated path is simple. +- Reduces the full path set to its maximal chains (longest non-extendable + dependency chains, with shorter sub-chains dropped). +- Persists each feature's longest downstream depth back to the ontology as + Feature.max_downstream_depth, then reports per-feature path counts, the + deepest dependency chains, and the owners that sit along the longest chain. + +Run: + /opt/homebrew/bin/python3.11 it_dependency_mapping.py + +Output: + Prints the total downstream path count, per-feature path counts and longest + downstream depth, the deepest maximal dependency chains, and the chain of + owners along the single longest chain (the upgrade / incident blast radius). +""" + +from pathlib import Path + +import pandas as pd +from relationalai.semantics import Integer, Model, String + +# -------------------------------------------------- +# Configure inputs +# -------------------------------------------------- + +DATA_DIR = Path(__file__).parent / "data" + +# Maximum traversal depth (in edges) when enumerating downstream paths. The +# estate is acyclic, so this only needs to cover the longest real chain. +MAX_DEPTH = 8 + + +def load_csv(path): + return pd.read_csv(path) + + +# -------------------------------------------------- +# Define semantic model & load data +# -------------------------------------------------- + +model = Model("it_dependency_mapping") + +# Feature concept: a service, data pipeline, feature job, or dashboard. +Feature = model.Concept("Feature", identify_by={"id": String}) +Feature.name = model.Property(f"{Feature} has {String:name}") +Feature.owner = model.Property(f"{Feature} owned by {String:owner}") +Feature.deploy_tier = model.Property(f"{Feature} has deploy tier {String:deploy_tier}") + +# Feature.contributes_to: a self-relationship -- the upstream feature feeds / +# contributes to the downstream feature, forming an acyclic dependency DAG. +Feature.contributes_to = model.Relationship( + f"{Feature} contributes to {Feature}", short_name="contributes_to" +) + +# Load feature data from CSV. +feature_data = model.data(load_csv(DATA_DIR / "features.csv")) +model.define(Feature.new(id=feature_data["id"])) +model.where(Feature.id == feature_data["id"]).define( + Feature.name(feature_data["name"]), + Feature.owner(feature_data["owner"]), + Feature.deploy_tier(feature_data["deploy_tier"]), +) + +# Load dependency edges from CSV (upstream feature -> downstream feature). +dependency_data = model.data(load_csv(DATA_DIR / "dependencies.csv")) +upstream, downstream = Feature.ref(), Feature.ref() +model.where( + upstream.id == dependency_data["from_feature"], + downstream.id == dependency_data["to_feature"], +).define(upstream.contributes_to(downstream)) + +# -------------------------------------------------- +# Paths: enumerate downstream dependency paths +# PREVIEW capability; requires relationalai>=1.13. +# -------------------------------------------------- +# model.path(Feature.contributes_to.repeat(1, MAX_DEPTH)) describes a +# variable-length traversal of 1..MAX_DEPTH contributes_to edges. all_paths() +# enumerates every such path; because contributes_to is acyclic, each path is +# simple. The result is one PathTraversal per path -- p.length is its hop count +# and p.nodes is the ordered sequence of Feature nodes it visits. + +print("=== IT Dependency Mapping: downstream paths ===") + +p_pattern = model.path(Feature.contributes_to.repeat(1, MAX_DEPTH)) +paths_df = ( + model.where(p := p_pattern.all_paths()) + .select( + p.alias("path_id"), + p.length.alias("hops"), + p.nodes["index"].alias("step"), + Feature(p.nodes).id.alias("feature_id"), + Feature(p.nodes).name.alias("feature_name"), + ) + .to_df() +) +paths_df["hops"] = paths_df["hops"].astype(int) +paths_df["step"] = paths_df["step"].astype(int) + +# Reassemble each path: group on the path-id column, order steps by node index, +# and join the visited features into an ordered chain. +chains = ( + paths_df.sort_values(["path_id", "step"]) + .groupby("path_id") + .agg( + hops=("hops", "first"), + node_ids=("feature_id", lambda s: tuple(s)), + chain=("feature_name", lambda s: " -> ".join(s)), + ) + .reset_index() +) + +print(f"\nTotal downstream dependency paths (1-{MAX_DEPTH} hops): {len(chains)}") + +# -------------------------------------------------- +# Per-feature path counts and longest downstream depth +# -------------------------------------------------- + +feature_df = model.select( + Feature.id.alias("feature_id"), + Feature.name.alias("feature_name"), + Feature.owner.alias("owner"), + Feature.deploy_tier.alias("deploy_tier"), +).to_df() + +# Paths starting at each feature, and the longest path starting there. +chains["start_id"] = chains["node_ids"].apply(lambda ids: ids[0]) +paths_started = chains.groupby("start_id").size().rename("paths_downstream") +max_depth_by_id = chains.groupby("start_id")["hops"].max().rename("max_downstream_depth") + +feature_summary = ( + feature_df.merge(paths_started, left_on="feature_id", right_index=True, how="left") + .merge(max_depth_by_id, left_on="feature_id", right_index=True, how="left") +) +feature_summary["paths_downstream"] = ( + feature_summary["paths_downstream"].fillna(0).astype(int) +) +feature_summary["max_downstream_depth"] = ( + feature_summary["max_downstream_depth"].fillna(0).astype(int) +) + +print("\n=== Per-feature downstream reach ===") +print( + f" {'Feature':<30} {'Tier':<9} {'Paths':>6} {'Max Depth':>10}" +) +print(f" {'-' * 57}") +for _, r in feature_summary.sort_values( + ["max_downstream_depth", "paths_downstream"], ascending=False +).iterrows(): + print( + f" {r['feature_name']:<30} {r['deploy_tier']:<9} " + f"{r['paths_downstream']:>6} {r['max_downstream_depth']:>10}" + ) + +# Persist each feature's longest downstream depth back to the ontology so a +# downstream query can rank features by reach without re-enumerating paths. +Feature.max_downstream_depth = model.Property( + f"{Feature} has {Integer:max_downstream_depth}" +) +depth_rows = feature_summary[["feature_id", "max_downstream_depth"]] +depth_data = model.data(depth_rows) +model.define( + Feature.max_downstream_depth(depth_data.max_downstream_depth) +).where(Feature.id == depth_data.feature_id) + +# -------------------------------------------------- +# Maximal chains: longest non-extendable dependency chains +# -------------------------------------------------- +# A path is maximal when its node sequence is not a contiguous sub-chain of any +# other enumerated path -- i.e. it cannot be extended upstream or downstream. +# This collapses the full path set to the small set of end-to-end chains. + +all_sequences = list(chains["node_ids"]) + + +def is_sub_chain(short, long): + """True if `short` is a contiguous sub-sequence of the longer `long`.""" + if len(short) >= len(long): + return False + return any( + long[i : i + len(short)] == short + for i in range(len(long) - len(short) + 1) + ) + + +maximal = chains[ + chains["node_ids"].apply( + lambda seq: not any( + is_sub_chain(seq, other) for other in all_sequences if other != seq + ) + ) +].copy() +maximal = maximal.sort_values(["hops", "chain"], ascending=[False, True]) + +print( + f"\n=== Maximal dependency chains ({len(maximal)} of {len(chains)} paths) ===" +) +for _, r in maximal.iterrows(): + print(f" {r['hops']} hops: {r['chain']}") + +# -------------------------------------------------- +# Owners along the single longest chain (blast radius) +# -------------------------------------------------- +# The longest chain is the worst-case propagation path: a change at its root has +# to clear every owner along the way. Join owner metadata onto its features. + +owner_by_id = dict(zip(feature_df["feature_id"], feature_df["owner"])) +name_by_id = dict(zip(feature_df["feature_id"], feature_df["feature_name"])) +tier_by_id = dict(zip(feature_df["feature_id"], feature_df["deploy_tier"])) + +longest = maximal.iloc[0] +longest_ids = longest["node_ids"] +distinct_owners = len({owner_by_id[i] for i in longest_ids}) + +print(f"\n=== Longest dependency chain: {longest['hops']} hops ===") +print(f" Chain: {longest['chain']}") +print(f" Spans {len(longest_ids)} features and {distinct_owners} owners:") +for i in longest_ids: + print(f" - {name_by_id[i]:<30} owner={owner_by_id[i]:<14} tier={tier_by_id[i]}") +print( + f"\n A change at '{name_by_id[longest_ids[0]]}' propagates through " + f"{len(longest_ids) - 1} downstream feature(s) before reaching " + f"'{name_by_id[longest_ids[-1]]}'." +) diff --git a/v1/it-dependency-mapping/pyproject.toml b/v1/it-dependency-mapping/pyproject.toml new file mode 100644 index 0000000..5bb4874 --- /dev/null +++ b/v1/it-dependency-mapping/pyproject.toml @@ -0,0 +1,17 @@ +[build-system] +requires = ["setuptools>=64", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "rai-template-it-dependency-mapping" +version = "0.0.0" +description = "RelationalAI template: map downstream software / data-pipeline dependency chains with variable-length path traversal over a dependency DAG" +readme = "README.md" +requires-python = ">=3.10" +dependencies = [ + "relationalai==1.13.0", + "pandas>=2.0.0", +] + +[tool.setuptools] +packages = [] From 50932e81f238dee9d4163f32e51fbed0436ee7fa Mon Sep 17 00:00:00 2001 From: cafzal Date: Tue, 16 Jun 2026 10:12:30 -0700 Subject: [PATCH 06/14] bom-reachability: README How-it-works section for assembly-path enumeration --- v1/bom-reachability/README.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/v1/bom-reachability/README.md b/v1/bom-reachability/README.md index c1576ce..af26e37 100644 --- a/v1/bom-reachability/README.md +++ b/v1/bom-reachability/README.md @@ -144,6 +144,18 @@ betweenness = graph.betweenness_centrality() Components with high betweenness are structural bottlenecks -- disrupting them affects the most product lines. +### 5. Enumerate Assembly Paths (PREVIEW, requires `relationalai>=1.13`) + +Where reachability returns dependency *pairs*, path enumeration returns the actual *build sequences*. It derives a SKU-to-SKU `feeds` edge from the `BillOfMaterials` intermediary (input SKU feeds output SKU) and enumerates every assembly path; because the BOM is acyclic, `.all_paths()` yields exactly the simple paths -- no cycle risk. A maximal-paths view keeps only the longest non-extendable chains, and the longest assembly depth is persisted as `SKU.assembly_depth`. + +```python +SKU.feeds = model.Relationship(f"{SKU} feeds into {SKU}", short_name="feeds") +p = model.path(SKU.feeds.repeat(1, MAX_ASSEMBLY_HOPS)).all_paths() +paths_df = model.where(p).select( + p.alias("path"), p.nodes["index"].alias("step"), SKU(p.nodes).name.alias("sku_name") +).to_df() +``` + ## Customize this template **Use your own data:** From ed87e265ac0b059aad60a7098c75c56bfd073949 Mon Sep 17 00:00:00 2001 From: cafzal Date: Tue, 16 Jun 2026 10:13:17 -0700 Subject: [PATCH 07/14] telco_network_recovery: runbook step 6b + README property for Stage 3.5 paths Runbook: question-shaped 'trace most-influential call paths' step (PageRank anchored by structural test, scoped to a seed hub). README: Subscriber .top_call_path_influence row in the concepts table. --- v1/telco_network_recovery/README.md | 1 + v1/telco_network_recovery/runbook.md | 12 ++++++++++++ 2 files changed, 13 insertions(+) diff --git a/v1/telco_network_recovery/README.md b/v1/telco_network_recovery/README.md index 6d9c538..256ff9a 100644 --- a/v1/telco_network_recovery/README.md +++ b/v1/telco_network_recovery/README.md @@ -236,6 +236,7 @@ One shared ontology threads all four stages. Each stage reads concepts and prope | `churn_risk_score` | Float | No | `[0, 1]` — probability of churn | | `customer_value` | Float | No | Precomputed: `lifetime_value × (1 + churn_risk_score)` — the per-subscriber weight Stage 3 sums into `weighted_impact` | | `influence_score` | Float | No | **Stage 3** PageRank on the call graph | +| `top_call_path_influence` | Float | No | **Stage 3.5** (PREVIEW) most-influential call path's summed PageRank, for the seed hub | **`TowerUpgradeOption`** — a (tower, tier) candidate upgrade. The MIP's decision space. diff --git a/v1/telco_network_recovery/runbook.md b/v1/telco_network_recovery/runbook.md index 19fb471..a1d8294 100644 --- a/v1/telco_network_recovery/runbook.md +++ b/v1/telco_network_recovery/runbook.md @@ -116,6 +116,18 @@ Four derived health properties (`avg_packet_loss`, `avg_latency_ms`, `avg_error_ `Subscriber.influence_score` (PageRank, 1,200 subs) plus two per-tower properties on the 142 critical towers: `CellTower.weighted_impact` (headline — sum of `Subscriber.customer_value = LTV × (1 + churn_risk_score)` over ACTIVE callers routed through; CDR-weighted so heavy callers count more than once) and `CellTower.weighted_pagerank` (secondary — sum of PageRank influence over the same set). The prescriptive MIP consumes `weighted_impact` in its objective. +### 6b. Trace the most-influential call paths (PREVIEW, requires `relationalai>=1.13`) + +**Prompt** + +``` +/rai-graph-analysis Starting from our most influential subscriber, what are the call paths of up to three hops through the network, which cell tower carried each hop, and which path carries the most social influence — the path running through the subscribers with the highest combined importance, where a subscriber is influential when influential subscribers call them? Don't revisit a subscriber. +``` + +**Response** + +Where PageRank scores a *subscriber*, paths scores the *route*. From the top-PageRank hub, call paths (≤ 3 hops, simple) are enumerated over an arity-3 caller-via-tower-callee edge; the routing tower on each hop is recovered, and each path is ranked by summed PageRank. Persists the hub's top route as `Subscriber.top_call_path_influence`. (Scoped to a seed hub — the full call graph is large and cyclic.) + ### 7. Optimize tier selection **Prompt** From 1e12ba0a2c900cd0d994882ad622a94d100bb8b3 Mon Sep 17 00:00:00 2001 From: cafzal Date: Tue, 16 Jun 2026 10:37:04 -0700 Subject: [PATCH 08/14] bom-reachability: make README assembly-paths snippet verbatim from script --- v1/bom-reachability/README.md | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/v1/bom-reachability/README.md b/v1/bom-reachability/README.md index af26e37..abd3d51 100644 --- a/v1/bom-reachability/README.md +++ b/v1/bom-reachability/README.md @@ -151,9 +151,16 @@ Where reachability returns dependency *pairs*, path enumeration returns the actu ```python SKU.feeds = model.Relationship(f"{SKU} feeds into {SKU}", short_name="feeds") p = model.path(SKU.feeds.repeat(1, MAX_ASSEMBLY_HOPS)).all_paths() -paths_df = model.where(p).select( - p.alias("path"), p.nodes["index"].alias("step"), SKU(p.nodes).name.alias("sku_name") -).to_df() +assembly_df = ( + model.where(p) + .select( + p.alias("path"), + p.nodes["index"].alias("step"), + SKU(p.nodes).id.alias("sku_id"), + SKU(p.nodes).name.alias("sku_name"), + ) + .to_df() +) ``` ## Customize this template From cc5dc2bf45298cd3cd09a1e8e5e7bf9c5097d8f7 Mon Sep 17 00:00:00 2001 From: cafzal Date: Tue, 16 Jun 2026 11:03:01 -0700 Subject: [PATCH 09/14] it-dependency-mapping: fix path reassembly fan-out (full-template 1.13 run) The end-to-end run surfaced a bug py_compile/ruff missed: selecting p.length alongside p.nodes fanned out the node rows, so maximal chains showed repeated nodes and wrong hop counts. Drop p.length from the select, dedupe (path_id, step), derive hops = max(step). Re-run verified: 46 paths -> 6 maximal, longest 5 hops (Clickstream Ingest -> ... -> Retention Dashboard, 6 features / 5 owners). --- .../it_dependency_mapping.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/v1/it-dependency-mapping/it_dependency_mapping.py b/v1/it-dependency-mapping/it_dependency_mapping.py index e1d2e60..6e6310f 100644 --- a/v1/it-dependency-mapping/it_dependency_mapping.py +++ b/v1/it-dependency-mapping/it_dependency_mapping.py @@ -95,23 +95,25 @@ def load_csv(path): model.where(p := p_pattern.all_paths()) .select( p.alias("path_id"), - p.length.alias("hops"), p.nodes["index"].alias("step"), Feature(p.nodes).id.alias("feature_id"), Feature(p.nodes).name.alias("feature_name"), ) .to_df() ) -paths_df["hops"] = paths_df["hops"].astype(int) paths_df["step"] = paths_df["step"].astype(int) - -# Reassemble each path: group on the path-id column, order steps by node index, -# and join the visited features into an ordered chain. +# Projecting p.nodes can emit duplicate (path, step) rows -- dedupe to one node +# per step before reassembly. (Do NOT also select p.length here: selecting it +# alongside p.nodes fans the node rows out.) +paths_df = paths_df.drop_duplicates(["path_id", "step"]).sort_values(["path_id", "step"]) + +# Reassemble each path: order steps by node index, join the visited features into +# an ordered chain. The hop count is the max node index (a path over N edges has +# nodes at indices 0..N). chains = ( - paths_df.sort_values(["path_id", "step"]) - .groupby("path_id") + paths_df.groupby("path_id") .agg( - hops=("hops", "first"), + hops=("step", "max"), node_ids=("feature_id", lambda s: tuple(s)), chain=("feature_name", lambda s: " -> ".join(s)), ) From b83ae923ed381367e0b53a71f39823290b56b269 Mon Sep 17 00:00:00 2001 From: cafzal Date: Tue, 16 Jun 2026 11:04:21 -0700 Subject: [PATCH 10/14] energy_grid_planning: count simple corridors, not raw walks (full-template 1.13 run) End-to-end run surfaced that n_corridors counted all enumerated walks (8844), mislabeled 'simple' and mismatching the runbook's 421. Count only simple corridors (len([...]), not the shadowed sum). Re-run confirmed: 421 simple corridors, max betweenness-load 99.833, DFW contingency 5 reroute / 1 lose all -- matches the runbook + paste-test. --- v1/energy_grid_planning/energy_grid_planning.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/v1/energy_grid_planning/energy_grid_planning.py b/v1/energy_grid_planning/energy_grid_planning.py index 17e4499..d6adec6 100644 --- a/v1/energy_grid_planning/energy_grid_planning.py +++ b/v1/energy_grid_planning/energy_grid_planning.py @@ -740,7 +740,13 @@ def most_fragile_corridor_per_dc(removed_id=None): baseline_corridors = most_fragile_corridor_per_dc() -n_corridors = corridor_df["corridor"].nunique() +# Count only SIMPLE corridors (drop node-repeating walks; the grid is cyclic). +# `len([...])`, not `sum(...)`: `sum` is shadowed by the relationalai import. +n_corridors = len([ + cid + for cid, grp in corridor_df.groupby("corridor") + if len(set(grp["substation_id"])) == len(grp["substation_id"]) +]) print( f"\n {n_corridors} generator-sub -> DC-sub corridors (<= {MAX_CORRIDOR_HOPS} hops, simple); " f"most-fragile corridor for {len(baseline_corridors)} DC substation(s):" From 22aea6c1f6180b704d0099c9e9928bedda6ae7d0 Mon Sep 17 00:00:00 2001 From: cafzal Date: Tue, 16 Jun 2026 11:13:42 -0700 Subject: [PATCH 11/14] telco_network_recovery: runbook 6b verified numbers (full-template 1.13 run) End-to-end run confirms the bundled data matches the eval (same seed/counts), so the 6b response now carries the verified output: seed SUB-CON-00900, 198 simple <=3-hop call paths, 54 towers, top route SUB-CON-00900 -> SUB-CON-00814 -> SUB-ENT-0038 -> SUB-CON-00644 (PageRank sum 0.009041). --- v1/telco_network_recovery/runbook.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v1/telco_network_recovery/runbook.md b/v1/telco_network_recovery/runbook.md index a1d8294..10dcddd 100644 --- a/v1/telco_network_recovery/runbook.md +++ b/v1/telco_network_recovery/runbook.md @@ -126,7 +126,7 @@ Four derived health properties (`avg_packet_loss`, `avg_latency_ms`, `avg_error_ **Response** -Where PageRank scores a *subscriber*, paths scores the *route*. From the top-PageRank hub, call paths (≤ 3 hops, simple) are enumerated over an arity-3 caller-via-tower-callee edge; the routing tower on each hop is recovered, and each path is ranked by summed PageRank. Persists the hub's top route as `Subscriber.top_call_path_influence`. (Scoped to a seed hub — the full call graph is large and cyclic.) +Where PageRank scores a *subscriber*, paths scores the *route*. From the top-PageRank hub (SUB-CON-00900), 198 simple call paths (≤ 3 hops) are enumerated over an arity-3 caller-via-tower-callee edge, recovering 54 distinct routing towers; each path is ranked by summed PageRank. Top route: `SUB-CON-00900 → SUB-CON-00814 → SUB-ENT-0038 → SUB-CON-00644` (PageRank sum 0.009041). Persists the hub's top route as `Subscriber.top_call_path_influence`. (Scoped to a seed hub — the full call graph is large and cyclic.) ### 7. Optimize tier selection From 14351f064dfa0f0e5dfd8c3392900a4e73638b72 Mon Sep 17 00:00:00 2001 From: cafzal Date: Mon, 22 Jun 2026 09:18:07 -0700 Subject: [PATCH 12/14] templates: bump 4 paths-template pins 1.13.0->1.15.0 (RPQ backend) Re-ran all four end-to-end on pyrel 1.15; path counts reproduce exactly under the RPQ translator (no regression): - bom-reachability: 18 assembly paths, 8 maximal - it-dependency-mapping: 46 paths, 6 maximal, 5-hop longest - energy_grid_planning: 421 corridors, fragility 99.833, Stage 4 OPTIMAL - telco_network_recovery: 198 call paths, 54 towers, Stage 4 MIP OPTIMAL Notably the telco arity-3 calls_via edge (caller, tower, callee -- entity last) enumerates correctly under RPQ; the entity-last ordering is the safe shape. --- v1/bom-reachability/pyproject.toml | 2 +- v1/energy_grid_planning/pyproject.toml | 2 +- v1/it-dependency-mapping/pyproject.toml | 2 +- v1/telco_network_recovery/pyproject.toml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/v1/bom-reachability/pyproject.toml b/v1/bom-reachability/pyproject.toml index 0196ef2..13e64c9 100644 --- a/v1/bom-reachability/pyproject.toml +++ b/v1/bom-reachability/pyproject.toml @@ -9,7 +9,7 @@ description = "RelationalAI template: bill of materials dependency tracing with readme = "README.md" requires-python = ">=3.10" dependencies = [ - "relationalai==1.13.0", + "relationalai==1.15.0", "pandas>=2.0.0", ] diff --git a/v1/energy_grid_planning/pyproject.toml b/v1/energy_grid_planning/pyproject.toml index cc9fcf6..3371663 100644 --- a/v1/energy_grid_planning/pyproject.toml +++ b/v1/energy_grid_planning/pyproject.toml @@ -9,7 +9,7 @@ description = "RelationalAI template: energy_grid_planning (PyRel v1)" readme = "README.md" requires-python = ">=3.10" dependencies = [ - "relationalai==1.13.0", + "relationalai==1.15.0", "pandas>=2.0", ] diff --git a/v1/it-dependency-mapping/pyproject.toml b/v1/it-dependency-mapping/pyproject.toml index 5bb4874..a6cb412 100644 --- a/v1/it-dependency-mapping/pyproject.toml +++ b/v1/it-dependency-mapping/pyproject.toml @@ -9,7 +9,7 @@ description = "RelationalAI template: map downstream software / data-pipeline de readme = "README.md" requires-python = ">=3.10" dependencies = [ - "relationalai==1.13.0", + "relationalai==1.15.0", "pandas>=2.0.0", ] diff --git a/v1/telco_network_recovery/pyproject.toml b/v1/telco_network_recovery/pyproject.toml index 89b3a11..caa0f98 100644 --- a/v1/telco_network_recovery/pyproject.toml +++ b/v1/telco_network_recovery/pyproject.toml @@ -9,7 +9,7 @@ description = "RelationalAI template: telco_network_recovery (PyRel v1)" readme = "README.md" requires-python = ">=3.10" dependencies = [ - "relationalai==1.13.0", + "relationalai==1.15.0", "pandas>=2.0", ] From 85f5e6d88ff598466e03e7e43321b98d3f8aa26e Mon Sep 17 00:00:00 2001 From: cafzal Date: Mon, 22 Jun 2026 09:21:34 -0700 Subject: [PATCH 13/14] templates: add cybersecurity-attack-paths (multi-edge kill-chain, 1.15) Net-new graph-paths template showcasing the multi-relationship-sequence capability in pyrel 1.15: an attack chain composes DISTINCT technique edges in kill-chain order (exploit a perimeter host, reuse credentials inward, then pivot laterally), which a single unioned edge or a flat join cannot express. - 12-asset enterprise estate, 16 technique-tagged steps; one edge per technique (exploit_to / cred_to / pivot_to) plus a can_reach union. - Multi-edge kill-chain: model.path(a.exploit_to, b.cred_to, c.pivot_to.repeat(1,2), dst) filtered to internet-facing source + crown-jewel dst -> 3 chains; p.relationships labels the technique per hop. - Point query: all web-01 -> db-01 routes over can_reach (<=6 hops, simple) -> 7 routes. - Exposure ranking (28/26/25) and persisted Asset.on_attack_path (11 of 12 assets). Runs clean on 1.15 (py_compile + ruff); runbook paste-tested by a fresh agent that reproduced all documented numbers (3 chains, 7 routes, 28/26/25, 11 assets) without seeing the script. --- v1/cybersecurity-attack-paths/README.md | 219 +++++++++++++++ .../cybersecurity_attack_paths.py | 258 ++++++++++++++++++ v1/cybersecurity-attack-paths/data/assets.csv | 13 + .../data/attack_steps.csv | 17 ++ v1/cybersecurity-attack-paths/pyproject.toml | 17 ++ v1/cybersecurity-attack-paths/runbook.md | 56 ++++ 6 files changed, 580 insertions(+) create mode 100644 v1/cybersecurity-attack-paths/README.md create mode 100644 v1/cybersecurity-attack-paths/cybersecurity_attack_paths.py create mode 100644 v1/cybersecurity-attack-paths/data/assets.csv create mode 100644 v1/cybersecurity-attack-paths/data/attack_steps.csv create mode 100644 v1/cybersecurity-attack-paths/pyproject.toml create mode 100644 v1/cybersecurity-attack-paths/runbook.md diff --git a/v1/cybersecurity-attack-paths/README.md b/v1/cybersecurity-attack-paths/README.md new file mode 100644 index 0000000..b60e590 --- /dev/null +++ b/v1/cybersecurity-attack-paths/README.md @@ -0,0 +1,219 @@ +--- +title: "Cybersecurity Attack Paths" +description: "Trace multi-step cyber attack chains across an enterprise asset graph by composing attacker techniques in order, then rank the routes that reach crown-jewel systems by their total exposure." +experience_level: intermediate +industry: "Technology & Telecom" +featured: false +reasoning_types: + - Graph +tags: + - graph-analytics + - paths + - multi-edge + - attack-paths + - lateral-movement + - cybersecurity +sidebar: + order: 7 +--- + +## What this template is for + +Attackers rarely reach a crown-jewel system in one move. They chain techniques: exploit an internet-facing host to get a foothold, reuse harvested credentials to move inward, then pivot laterally across the network until they reach a domain controller or a customer database. Security teams need to see those end-to-end chains, not just a list of individual vulnerabilities, so they can cut the routes that actually matter. + +This template enumerates multi-step attack paths across an enterprise asset graph and ranks the ones that reach a crown jewel. **It uses graph path enumeration with multi-edge patterns: a single path can compose distinct relationships in a fixed order, so the analysis follows the real kill-chain sequence (exploit, then credential reuse, then lateral movement) rather than treating every move as interchangeable.** + +## Who this is for + +- **Security analysts and threat modelers** who want to reason about attack paths, not just isolated findings +- **Intermediate users** comfortable reading Python; graph and path terms are explained inline +- **Detection and remediation teams** prioritizing which assets to harden first + +## What you'll build + +- Load a 12-asset enterprise estate (perimeter hosts, internal services and workstations, restricted crown jewels) and 16 directed attack steps from CSV +- Model three distinct attacker techniques as separate edges between assets: vulnerability exploitation, credential reuse, and network pivoting +- Enumerate kill-chain attack paths with a multi-edge pattern that fixes the technique order +- Run a point query that lists every route between one named entry point and one named crown jewel +- Rank the kill-chains by the asset exposure summed along each one +- Persist which assets lie on a crown-jewel attack path back onto the ontology + +## What's included + +- **Self-contained script**: `cybersecurity_attack_paths.py` runs the full analysis end-to-end +- **Data**: `data/assets.csv` (12 assets) and `data/attack_steps.csv` (16 technique-tagged edges) + +## Prerequisites + +- Python 3.10 or newer +- A Snowflake account that has the RAI Native App installed +- A Snowflake user with permissions to access the RAI Native App +- `relationalai` 1.15 or newer (path enumeration with multi-edge patterns is a preview capability) + +## Quickstart + +1. Download and extract this template: + + ```bash + curl -O https://docs.relational.ai/templates/zips/v1/cybersecurity-attack-paths.zip + unzip cybersecurity-attack-paths.zip + cd cybersecurity-attack-paths + ``` + + > [!TIP] + > You can also download the template ZIP using the "Download ZIP" button at the top of this page. + +2. **Create and activate a virtual environment** + + ```bash + python -m venv .venv + source .venv/bin/activate + python -m pip install -U pip + ``` + +3. **Install dependencies** + + ```bash + python -m pip install . + ``` + +4. **Configure Snowflake connection and RAI profile** + + ```bash + rai init + ``` + +5. **Run the template** + + ```bash + python cybersecurity_attack_paths.py + ``` + + Expected output starts with the kill-chains that reach a crown jewel: + + ```text + 3 kill-chain attack path(s) reach a crown jewel (exploit -> cred -> 1-2 pivots, from an internet-facing asset): + [3 hops] VPN Gateway --[exploit]--> Jump Host --[cred]--> File Server --[pivot]--> Customer Database + ``` + + See the runbook for the full output. + +## Template structure + +```text +cybersecurity-attack-paths/ +├── cybersecurity_attack_paths.py +├── pyproject.toml +├── README.md +├── runbook.md +└── data/ + ├── assets.csv + └── attack_steps.csv +``` + +## How it works + +```text +CSV files --> Define Asset + technique edges --> Kill-chain enumeration (multi-edge) + --> Point query (entry to crown jewel) --> Exposure ranking --> Persist Asset.on_attack_path +``` + +### 1. Model assets and one edge per technique + +Each attacker technique is its own directed relationship between assets, plus a technique-agnostic union edge for the point query: + +```python +Asset.exploit_to = model.Relationship(f"{Asset} exploits to {Asset}", short_name="exploit_to") +Asset.cred_to = model.Relationship(f"{Asset} reuses credentials to {Asset}", short_name="cred_to") +Asset.pivot_to = model.Relationship(f"{Asset} pivots to {Asset}", short_name="pivot_to") +# Technique-agnostic union edge: an attacker can move from src to dst by SOME +# technique. Used for the point query (any route between two named assets). +Asset.can_reach = model.Relationship(f"{Asset} can reach {Asset}", short_name="can_reach") +``` + +### 2. Enumerate kill-chain attack paths (multi-edge, requires `relationalai>=1.15`) + +The path pattern composes the three techniques in series. The first hop is an exploit, the second is credential reuse, then one or more lateral pivots, ending at an explicit destination. Filtering the source to an internet-facing asset and the destination to a crown jewel pins the threat model: + +```python +a, b, c, dst = Asset.ref(), Asset.ref(), Asset.ref(), Asset.ref() +kill = model.path(a.exploit_to, b.cred_to, c.pivot_to.repeat(1, MAX_PIVOTS), dst).all_paths() +kill_df = ( + model.where( + kill, + a.internet_facing == "yes", + dst.crown_jewel == "yes", + ) + .select( + kill.alias("path_id"), + kill.nodes["index"].alias("step"), + Asset(kill.nodes).id.alias("asset_id"), + Asset(kill.nodes).name.alias("asset_name"), + ) + .to_df() +) +``` + +The edge order is enforced by the pattern. A single "can move" union edge or a flat join cannot express "exploit first, then credentials, then pivots", which is exactly the kill-chain signature analysts care about. `kill.relationships["relationship"]` reads the technique used at each hop. + +### 3. Point query between two named assets + +Pinning both endpoints by id enumerates every route between a chosen entry point and a chosen crown jewel over the union edge: + +```python +src_pt, dst_pt = Asset.ref(), Asset.ref() +route = model.path(src_pt.can_reach.repeat(1, MAX_ROUTE_HOPS), dst_pt).all_paths() +route_df = ( + model.where( + route, + src_pt.id == ENTRY_ASSET, + dst_pt.id == TARGET_ASSET, + ) + .select( + route.alias("path_id"), + route.nodes["index"].alias("step"), + Asset(route.nodes).id.alias("asset_id"), + ) + .to_df() +) +``` + +### 4. Persist the assets on a crown-jewel attack path + +The assets along the kill-chains are flagged back onto the ontology so a later query can pull them without re-enumerating paths: + +```python +Asset.on_attack_path = model.Property(f"{Asset} on attack path {String:on_attack_path}") +on_path_ids = sorted({i for ch in chains for i in ch["asset_ids"]}) +flag_data = model.data(pd.DataFrame({"id": on_path_ids})) +fa = Asset.ref() +model.where(fa.id == flag_data["id"]).define(fa.on_attack_path("yes")) +``` + +## Customize this template + +**Use your own data:** +- Replace the CSVs in `data/` with your own assets and attack steps, keeping the same column names. Tag each step with the technique an attacker would use (`exploit`, `cred`, `pivot`, or your own taxonomy). +- Mark internet-facing assets and crown jewels with `yes` or `no` so the kill-chain endpoints match your environment. + +**Extend the analysis:** +- Add more techniques (for example `phish` or `escalate`) as additional edges and lengthen the multi-edge pattern. +- Raise `MAX_PIVOTS` or `MAX_ROUTE_HOPS` for larger estates with deeper lateral movement. +- Feed `Asset.exposure_score` from a real vulnerability or attack-surface feed so the ranking reflects live risk. + +## Troubleshooting + +
+ Why do I see relationalai version or path import errors? + +- Path enumeration with multi-edge patterns requires `relationalai` 1.15 or newer. Confirm your installed version with `python -m pip show relationalai`. + +
+ +
+ Why does authentication or configuration fail? + +- Run `rai init` to create or update `raiconfig.toml`. +- If you have multiple profiles, set `RAI_PROFILE` or switch profiles in your config. + +
diff --git a/v1/cybersecurity-attack-paths/cybersecurity_attack_paths.py b/v1/cybersecurity-attack-paths/cybersecurity_attack_paths.py new file mode 100644 index 0000000..69e1aa8 --- /dev/null +++ b/v1/cybersecurity-attack-paths/cybersecurity_attack_paths.py @@ -0,0 +1,258 @@ +"""Cybersecurity attack-path analysis (graph paths) template. + +Enumerates multi-step attack chains across an enterprise asset graph by composing +distinct attacker techniques in series -- a capability unlocked by multi-edge path +patterns (relationalai>=1.15): + +- Loads Asset nodes (hosts, services, accounts) and three DISTINCT directed edges + between them, one per technique: exploit_to (vulnerability exploitation), + cred_to (credential reuse), pivot_to (network lateral movement). +- Enumerates kill-chain attack paths with a multi-relationship sequence: + ``model.path(a.exploit_to, b.cred_to, c.pivot_to.repeat(1, N), dst)`` -- the + classic order (exploit a perimeter host, reuse credentials inward, then pivot to + a crown jewel). The edge order is enforced by the pattern; a single unioned + "can move" edge or a flat join cannot express "exploit FIRST, then creds, then + pivots". ``p.relationships`` labels the technique used at each hop. +- Runs a point query between one named internet-facing entry and one named crown + jewel over a derived ``can_reach`` edge, enumerating every route between them. +- Scores each kill-chain by the asset exposure summed along it, and persists + ``Asset.on_attack_path`` back to the ontology for the assets that lie on a + crown-jewel-reaching chain. + +Run: + /opt/homebrew/bin/python3.11 cybersecurity_attack_paths.py + +Output: + Prints the kill-chain attack paths into crown jewels (with the technique at + each hop), every route between a chosen entry point and a chosen crown jewel, + the kill-chains ranked by total asset exposure, and the assets flagged + on a crown-jewel attack path. +""" + +from pathlib import Path + +import pandas as pd +from relationalai.semantics import Integer, Model, String + +# -------------------------------------------------- +# Configure inputs +# -------------------------------------------------- + +DATA_DIR = Path(__file__).parent / "data" + +# Lateral-movement depth: how many pivot_to hops the kill-chain may take after the +# initial exploit + credential reuse. The estate is small, so a low cap suffices. +MAX_PIVOTS = 2 +# Maximum length (in edges) of a route in the point query between two named assets. +MAX_ROUTE_HOPS = 6 +# The point query's endpoints: a specific internet-facing entry and a crown jewel. +ENTRY_ASSET = "web-01" +TARGET_ASSET = "db-01" + + +def load_csv(filename): + return pd.read_csv(DATA_DIR / filename) + + +# -------------------------------------------------- +# Define semantic model & load data +# -------------------------------------------------- + +model = Model("cybersecurity_attack_paths") + +# Asset concept: a host, service, or account in the enterprise estate. +Asset = model.Concept("Asset", identify_by={"id": String}) +Asset.name = model.Property(f"{Asset} has {String:name}") +Asset.zone = model.Property(f"{Asset} sits in {String:zone}") +Asset.internet_facing = model.Property(f"{Asset} is internet facing {String:internet_facing}") +Asset.crown_jewel = model.Property(f"{Asset} is crown jewel {String:crown_jewel}") +Asset.exposure_score = model.Property(f"{Asset} has exposure score {Integer:exposure_score}") + +# One directed self-relationship per attacker technique. An edge means "an attacker +# on the source asset can reach the destination asset by this technique". +Asset.exploit_to = model.Relationship(f"{Asset} exploits to {Asset}", short_name="exploit_to") +Asset.cred_to = model.Relationship(f"{Asset} reuses credentials to {Asset}", short_name="cred_to") +Asset.pivot_to = model.Relationship(f"{Asset} pivots to {Asset}", short_name="pivot_to") +# Technique-agnostic union edge: an attacker can move from src to dst by SOME +# technique. Used for the point query (any route between two named assets). +Asset.can_reach = model.Relationship(f"{Asset} can reach {Asset}", short_name="can_reach") + +# Load asset data. +asset_data = model.data(load_csv("assets.csv")) +model.define(Asset.new(id=asset_data["id"])) +model.where(Asset.id == asset_data["id"]).define( + Asset.name(asset_data["name"]), + Asset.zone(asset_data["zone"]), + Asset.internet_facing(asset_data["internet_facing"]), + Asset.crown_jewel(asset_data["crown_jewel"]), + Asset.exposure_score(asset_data["exposure_score"]), +) + +# Load attack steps. Each row is one technique-specific edge; split by technique to +# populate the three typed edges, and load every row into the can_reach union. +steps_df = load_csv("attack_steps.csv") +for technique, rel in [("exploit", "exploit_to"), ("cred", "cred_to"), ("pivot", "pivot_to")]: + sub = model.data(steps_df[steps_df["technique"] == technique][["src", "dst"]]) + u, v = Asset.ref(), Asset.ref() + model.where( + u.id == sub["src"], + v.id == sub["dst"], + ).define(getattr(u, rel)(v)) + +all_steps = model.data(steps_df[["src", "dst"]]) +ru, rv = Asset.ref(), Asset.ref() +model.where( + ru.id == all_steps["src"], + rv.id == all_steps["dst"], +).define(ru.can_reach(rv)) + +# -------------------------------------------------- +# Paths: kill-chain attack paths (multi-relationship sequence) +# PREVIEW capability; requires relationalai>=1.15. +# -------------------------------------------------- +# model.path(a.exploit_to, b.cred_to, c.pivot_to.repeat(1, MAX_PIVOTS), dst) is a +# MULTI-EDGE pattern: distinct relationships in series. It matches the kill-chain +# order -- one exploit hop, then one credential-reuse hop, then 1..MAX_PIVOTS +# lateral pivots -- ending at the explicit dst endpoint. Filtering the source to +# an internet-facing asset and dst to a crown jewel pins the threat model: an +# externally reachable foothold that ends on a high-value asset. + +print("=== Cybersecurity Attack Paths: kill-chain into crown jewels ===") + +a, b, c, dst = Asset.ref(), Asset.ref(), Asset.ref(), Asset.ref() +kill = model.path(a.exploit_to, b.cred_to, c.pivot_to.repeat(1, MAX_PIVOTS), dst).all_paths() +kill_df = ( + model.where( + kill, + a.internet_facing == "yes", + dst.crown_jewel == "yes", + ) + .select( + kill.alias("path_id"), + kill.nodes["index"].alias("step"), + Asset(kill.nodes).id.alias("asset_id"), + Asset(kill.nodes).name.alias("asset_name"), + ) + .to_df() +) +# Technique at each hop (the relationship label), as a separate projection. +hop_df = ( + model.where( + kill, + a.internet_facing == "yes", + dst.crown_jewel == "yes", + ) + .select( + kill.alias("path_id"), + kill.relationships["index"].alias("hop"), + kill.relationships["relationship"].alias("technique"), + ) + .to_df() +) + +kill_df["step"] = kill_df["step"].astype(int) +kill_df = kill_df.drop_duplicates(["path_id", "step"]).sort_values(["path_id", "step"]) +hop_df["hop"] = hop_df["hop"].astype(int) +hop_df = hop_df.drop_duplicates(["path_id", "hop"]).sort_values(["path_id", "hop"]) + +# Reassemble each kill-chain: ordered asset names + the technique used at each hop. +def technique_label(raw): + # relationship labels arrive as e.g. "-->"; strip to the verb stem. + return raw.strip("-<>⟨⟩→ ").replace("_to", "") + +chains = [] +for pid, g in kill_df.groupby("path_id"): + assets = list(g.sort_values("step")["asset_name"]) + techs = [technique_label(t) for t in + hop_df[hop_df["path_id"] == pid].sort_values("hop")["technique"]] + labelled = assets[0] + for nm, tech in zip(assets[1:], techs): + labelled += f" --[{tech}]--> {nm}" + chains.append({"path_id": pid, "hops": len(assets) - 1, "asset_ids": tuple( + g.sort_values("step")["asset_id"]), "labelled": labelled}) + +print(f"\n{len(chains)} kill-chain attack path(s) reach a crown jewel " + f"(exploit -> cred -> 1-{MAX_PIVOTS} pivots, from an internet-facing asset):") +for ch in sorted(chains, key=lambda c: c["hops"]): + print(f" [{ch['hops']} hops] {ch['labelled']}") + +# -------------------------------------------------- +# Point query: every route between one entry and one crown jewel +# -------------------------------------------------- +# Pin both endpoints by id and enumerate all simple routes between them over the +# technique-agnostic can_reach edge (any technique, 1..MAX_ROUTE_HOPS). This is the +# >=1.15 native point query -- src/dst unified to specific assets inside all_paths(). + +src_pt, dst_pt = Asset.ref(), Asset.ref() +route = model.path(src_pt.can_reach.repeat(1, MAX_ROUTE_HOPS), dst_pt).all_paths() +route_df = ( + model.where( + route, + src_pt.id == ENTRY_ASSET, + dst_pt.id == TARGET_ASSET, + ) + .select( + route.alias("path_id"), + route.nodes["index"].alias("step"), + Asset(route.nodes).id.alias("asset_id"), + ) + .to_df() +) +route_df["step"] = route_df["step"].astype(int) +route_df = route_df.drop_duplicates(["path_id", "step"]).sort_values(["path_id", "step"]) +routes = [] +for pid, g in route_df.groupby("path_id"): + seq = list(g.sort_values("step")["asset_id"]) + if len(set(seq)) == len(seq): # simple + routes.append(tuple(seq)) +routes = sorted(set(routes), key=len) + +print(f"\n=== Point query: all routes from {ENTRY_ASSET} to {TARGET_ASSET} " + f"(any technique, <= {MAX_ROUTE_HOPS} hops, simple) ===") +print(f" {len(routes)} route(s):") +for seq in routes: + print(" " + " -> ".join(seq)) + +# -------------------------------------------------- +# Rank kill-chains by total asset exposure +# -------------------------------------------------- +# The riskiest chain is the one whose assets carry the most exposure overall -- the +# first remediation target. Exposure is a per-asset score (a stand-in for a CVSS / +# attack-surface metric) summed across the chain's distinct assets. + +exposure = dict( + model.select(Asset.id.alias("id"), Asset.exposure_score.alias("e")).to_df() + .itertuples(index=False, name=None) +) +for ch in chains: + ch["total_exposure"] = sum(exposure[i] for i in ch["asset_ids"]) + +print("\n=== Kill-chains ranked by total asset exposure (highest = remediate first) ===") +for ch in sorted(chains, key=lambda c: c["total_exposure"], reverse=True): + print(f" exposure {ch['total_exposure']:>3} ({ch['hops']} hops) {ch['labelled']}") + +# -------------------------------------------------- +# Persist: flag assets that lie on a crown-jewel attack path +# -------------------------------------------------- +# Bind the result back onto the ontology so a downstream query can pull the assets +# on a kill-chain without re-enumerating paths. + +Asset.on_attack_path = model.Property(f"{Asset} on attack path {String:on_attack_path}") +on_path_ids = sorted({i for ch in chains for i in ch["asset_ids"]}) +flag_data = model.data(pd.DataFrame({"id": on_path_ids})) +fa = Asset.ref() +model.where(fa.id == flag_data["id"]).define(fa.on_attack_path("yes")) + +x = Asset.ref() +flagged = ( + model.where( + x.on_attack_path == "yes", + ) + .select(x.id.alias("id"), x.name.alias("name"), x.zone.alias("zone")) + .to_df() +) + +print("\n=== Assets on a crown-jewel attack path (Asset.on_attack_path persisted) ===") +print(f" {len(flagged)} of {len(exposure)} assets:") +for _, r in flagged.sort_values("id").iterrows(): + print(f" {r['id']:<8} {r['name']:<24} ({r['zone']})") diff --git a/v1/cybersecurity-attack-paths/data/assets.csv b/v1/cybersecurity-attack-paths/data/assets.csv new file mode 100644 index 0000000..fab2010 --- /dev/null +++ b/v1/cybersecurity-attack-paths/data/assets.csv @@ -0,0 +1,13 @@ +id,name,zone,internet_facing,crown_jewel,exposure_score +web-01,Public Web Server,dmz,yes,no,7 +vpn-01,VPN Gateway,dmz,yes,no,8 +mail-01,Mail Relay,dmz,yes,no,6 +app-01,Application Server,internal,no,no,5 +app-02,API Backend,internal,no,no,6 +jump-01,Jump Host,internal,no,no,4 +ws-01,Engineer Workstation,internal,no,no,5 +ws-02,Finance Workstation,internal,no,no,6 +fs-01,File Server,internal,no,no,4 +bkp-01,Backup Server,restricted,no,no,3 +dc-01,Domain Controller,restricted,no,yes,9 +db-01,Customer Database,restricted,no,yes,9 diff --git a/v1/cybersecurity-attack-paths/data/attack_steps.csv b/v1/cybersecurity-attack-paths/data/attack_steps.csv new file mode 100644 index 0000000..e79721b --- /dev/null +++ b/v1/cybersecurity-attack-paths/data/attack_steps.csv @@ -0,0 +1,17 @@ +src,dst,technique +web-01,app-01,exploit +vpn-01,jump-01,exploit +mail-01,ws-02,exploit +app-01,app-02,exploit +ws-01,fs-01,exploit +app-01,jump-01,cred +jump-01,dc-01,cred +ws-02,ws-01,cred +app-02,db-01,cred +jump-01,fs-01,cred +web-01,vpn-01,pivot +jump-01,bkp-01,pivot +ws-01,dc-01,pivot +fs-01,db-01,pivot +app-02,jump-01,pivot +bkp-01,db-01,pivot diff --git a/v1/cybersecurity-attack-paths/pyproject.toml b/v1/cybersecurity-attack-paths/pyproject.toml new file mode 100644 index 0000000..cf164c1 --- /dev/null +++ b/v1/cybersecurity-attack-paths/pyproject.toml @@ -0,0 +1,17 @@ +[build-system] +requires = ["setuptools>=64", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "rai-template-cybersecurity-attack-paths" +version = "0.0.0" +description = "RelationalAI template: enumerate multi-step cyber attack chains across an asset graph by composing distinct attacker techniques in series with multi-edge path patterns" +readme = "README.md" +requires-python = ">=3.10" +dependencies = [ + "relationalai==1.15.0", + "pandas>=2.0.0", +] + +[tool.setuptools] +packages = [] diff --git a/v1/cybersecurity-attack-paths/runbook.md b/v1/cybersecurity-attack-paths/runbook.md new file mode 100644 index 0000000..b3ca887 --- /dev/null +++ b/v1/cybersecurity-attack-paths/runbook.md @@ -0,0 +1,56 @@ +# Cybersecurity Attack Paths — Analyst Runbook + +A security team wants to see how an external attacker could chain techniques to reach a crown-jewel system, not just a list of isolated vulnerabilities. The dataset is a 12-asset enterprise estate (perimeter hosts, internal services and workstations, two restricted crown jewels) connected by 16 directed attack steps, each tagged with a technique. The analysis enumerates end-to-end attack chains and ranks the ones that reach a crown jewel so the team knows which routes to cut first. + +```text +Asset graph (12 assets, 16 technique-tagged steps: exploit / cred / pivot) + │ + ▼ +/rai-graph-analysis — Path enumeration (multi-edge, relationalai>=1.15) + • kill-chain signature: exploit (from an internet-facing asset), + then credential reuse, then 1-2 lateral pivots, into a crown jewel -> 3 chains + • point query: every route from Public Web Server to Customer Database + (any technique, <= 6 hops, no asset revisited) -> 7 routes + • rank kill-chains by total Asset.exposure_score along each -> top = 28 + • persist Asset.on_attack_path -> 11 of 12 assets +``` + +These prompts are designed to run in order in a single session: later questions read ontology state (the technique edges, the persisted flags) written by earlier ones. + +--- + +## 1. Build the ontology + +**Prompt:** Build a RelationalAI ontology from `data/assets.csv` and `data/attack_steps.csv`. Each asset has an id, name, zone, whether it is internet-facing, whether it is a crown jewel, and an exposure score. Each attack step is a directed move from one asset to another tagged with a technique (`exploit`, `cred`, or `pivot`). Model one relationship per technique between assets, plus a technique-agnostic `can_reach` relationship populated from every step. + +**Response:** Loads 12 `Asset` nodes and four asset-to-asset relationships — `exploit_to`, `cred_to`, `pivot_to` (one per technique), and the union `can_reach`. The 16 steps split into the three typed edges and all 16 populate `can_reach`. + +## 2. Examine the ontology + +**Prompt:** What concepts and relationships does the ontology have? + +**Response:** One concept, `Asset`, with properties `name`, `zone`, `internet_facing`, `crown_jewel`, and `exposure_score`; and four self-relationships (`exploit_to`, `cred_to`, `pivot_to`, `can_reach`). + +## 3. Kill-chain attack paths into crown jewels + +**Prompt:** Which attack chains follow the full kill-chain signature — an exploit step starting from an internet-facing asset, then a credential-reuse step, then one or two lateral pivots — and end at a crown jewel (an asset flagged `crown_jewel`)? Show the technique used at each hop. + +**Response:** 3 kill-chains reach a crown jewel. VPN Gateway —exploit→ Jump Host —cred→ File Server —pivot→ Customer Database; Mail Relay —exploit→ Finance Workstation —cred→ Engineer Workstation —pivot→ Domain Controller; and a 4-hop chain Public Web Server —exploit→ Application Server —cred→ Jump Host —pivot→ Backup Server —pivot→ Customer Database. The technique order is enforced by the path pattern, so chains that move in a different order are correctly excluded. + +## 4. All routes between one entry and one crown jewel + +**Prompt:** Treating any technique as a possible move, what are all the routes an attacker on the Public Web Server (`web-01`) could take to reach the Customer Database (`db-01`) in at most six hops, without revisiting an asset? + +**Response:** 7 routes connect `web-01` to `db-01`, ranging from 3 hops (web-01, app-01, app-02, db-01) to 5 hops. Both endpoints are pinned by id, so this is a point query that returns the actual route sequences, not just whether a path exists. + +## 5. Which kill-chain to remediate first + +**Prompt:** Rank the crown-jewel kill-chains by the total exposure of the assets along each chain (the sum of each distinct asset's `exposure_score`), highest first, so we can prioritize remediation. + +**Response:** Highest total exposure is the Public Web Server chain at 28, then the Mail Relay chain at 26, then the VPN Gateway chain at 25. The Public Web Server chain is the first remediation target. + +## 6. Persist the assets on an attack path + +**Prompt:** Flag every asset that lies on one of the crown-jewel kill-chains by setting `Asset.on_attack_path` to `yes`, and list the flagged assets. + +**Response:** 11 of 12 assets are flagged on a crown-jewel kill-chain and persisted as `Asset.on_attack_path`. Only the API Backend (`app-02`) is not on a kill-chain — it appears in `web-01`-to-`db-01` routes but never in a chain that follows the exploit-then-credential-then-pivot order. From db7710dd995c0f146503c2a9a78361bafd92fde0 Mon Sep 17 00:00:00 2001 From: cafzal Date: Mon, 22 Jun 2026 15:21:02 -0700 Subject: [PATCH 14/14] templates: address pre-share review findings it-dependency-mapping README: the 'How it works' enumeration snippet was teaching the p.length-fanout anti-pattern the script avoids -- replaced with the script's actual select (no p.length) + dedupe + max(step) reassembly. Version refs: bump the two hard prereq contradictions (energy ==1.11.0, telco ==1.12.0) and the paths PREVIEW notes (>=1.13) to match the ==1.15.0 pin; kept the accurate 'introduced in 1.13' history and the >=1.12 member-mapping note. cybersecurity-attack-paths: dedupe the per-chain exposure sum (set()) to match the 'distinct assets' claim; anchor technique_label to a trailing '_to' suffix so a custom mid-string technique isn't mangled. bom-reachability: single pandas import style (pd.read_csv). All five py_compile + ruff clean; bom re-run unchanged (18 paths / 8 maximal); cybersecurity re-run unchanged (3 chains, exposure 28/26/25, 7 routes, 11/12). --- v1/bom-reachability/README.md | 2 +- v1/bom-reachability/bom_reachability.py | 9 ++++----- .../cybersecurity_attack_paths.py | 5 +++-- v1/energy_grid_planning/README.md | 4 ++-- v1/energy_grid_planning/energy_grid_planning.py | 4 ++-- v1/energy_grid_planning/runbook.md | 2 +- v1/it-dependency-mapping/README.md | 17 +++++++++-------- .../it_dependency_mapping.py | 2 +- v1/telco_network_recovery/README.md | 2 +- v1/telco_network_recovery/runbook.md | 2 +- .../telco_network_recovery.py | 4 ++-- 11 files changed, 27 insertions(+), 26 deletions(-) diff --git a/v1/bom-reachability/README.md b/v1/bom-reachability/README.md index abd3d51..3c2be02 100644 --- a/v1/bom-reachability/README.md +++ b/v1/bom-reachability/README.md @@ -144,7 +144,7 @@ betweenness = graph.betweenness_centrality() Components with high betweenness are structural bottlenecks -- disrupting them affects the most product lines. -### 5. Enumerate Assembly Paths (PREVIEW, requires `relationalai>=1.13`) +### 5. Enumerate Assembly Paths (PREVIEW, requires `relationalai>=1.15`) Where reachability returns dependency *pairs*, path enumeration returns the actual *build sequences*. It derives a SKU-to-SKU `feeds` edge from the `BillOfMaterials` intermediary (input SKU feeds output SKU) and enumerates every assembly path; because the BOM is acyclic, `.all_paths()` yields exactly the simple paths -- no cycle risk. A maximal-paths view keeps only the longest non-extendable chains, and the longest assembly depth is persisted as `SKU.assembly_depth`. diff --git a/v1/bom-reachability/bom_reachability.py b/v1/bom-reachability/bom_reachability.py index f9be0af..b0a757d 100644 --- a/v1/bom-reachability/bom_reachability.py +++ b/v1/bom-reachability/bom_reachability.py @@ -10,7 +10,7 @@ Algorithms: reachable(full=True) for transitive dependency tracing, betweenness_centrality() for identifying structural bottlenecks. -Assembly path enumeration (PREVIEW, requires relationalai>=1.13): enumerate the +Assembly path enumeration (PREVIEW, requires relationalai>=1.15): enumerate the bottom-up assembly chains that build each finished good. Derives a binary SKU->SKU "feeds into" edge from the BillOfMaterials intermediary (input feeds output), enumerates every assembly path with model.path(...).all_paths(), @@ -31,7 +31,6 @@ from pathlib import Path import pandas as pd -from pandas import read_csv from relationalai.semantics import Float, Integer, Model, String, where from relationalai.semantics.reasoners.graph import Graph @@ -49,7 +48,7 @@ SKU.type = model.Property(f"{SKU} has type {String:type}") SKU.category = model.Property(f"{SKU} in {String:category}") -sku_data = model.data(read_csv(data_dir / "skus.csv")) +sku_data = model.data(pd.read_csv(data_dir / "skus.csv")) model.define(SKU.new(id=sku_data["ID"])) where(SKU.id == sku_data["ID"]).define( SKU.name(sku_data["NAME"]), @@ -62,7 +61,7 @@ BillOfMaterials.output_sku = model.Relationship(f"{BillOfMaterials} produces {SKU}") BillOfMaterials.input_sku = model.Relationship(f"{BillOfMaterials} requires {SKU}") -bom_data = model.data(read_csv(data_dir / "bill_of_materials.csv")) +bom_data = model.data(pd.read_csv(data_dir / "bill_of_materials.csv")) model.define(BillOfMaterials.new(id=bom_data["ID"])) where(BillOfMaterials.id == bom_data["ID"]).define( BillOfMaterials.output_sku(SKU.lookup(id=bom_data["OUTPUT_SKU_ID"])), @@ -176,7 +175,7 @@ # -------------------------------------------------- # Assembly path enumeration -# PREVIEW capability; requires relationalai>=1.13. +# PREVIEW capability; requires relationalai>=1.15. # -------------------------------------------------- # Where betweenness scores a single *node*, this enumerates the full *chains* # that build each finished good: every bottom-up assembly path from a raw diff --git a/v1/cybersecurity-attack-paths/cybersecurity_attack_paths.py b/v1/cybersecurity-attack-paths/cybersecurity_attack_paths.py index 69e1aa8..55009c7 100644 --- a/v1/cybersecurity-attack-paths/cybersecurity_attack_paths.py +++ b/v1/cybersecurity-attack-paths/cybersecurity_attack_paths.py @@ -158,7 +158,8 @@ def load_csv(filename): # Reassemble each kill-chain: ordered asset names + the technique used at each hop. def technique_label(raw): # relationship labels arrive as e.g. "-->"; strip to the verb stem. - return raw.strip("-<>⟨⟩→ ").replace("_to", "") + stem = raw.strip("-<>⟨⟩→ ") + return stem[:-3] if stem.endswith("_to") else stem chains = [] for pid, g in kill_df.groupby("path_id"): @@ -225,7 +226,7 @@ def technique_label(raw): .itertuples(index=False, name=None) ) for ch in chains: - ch["total_exposure"] = sum(exposure[i] for i in ch["asset_ids"]) + ch["total_exposure"] = sum(exposure[i] for i in set(ch["asset_ids"])) print("\n=== Kill-chains ranked by total asset exposure (highest = remediate first) ===") for ch in sorted(chains, key=lambda c: c["total_exposure"], reverse=True): diff --git a/v1/energy_grid_planning/README.md b/v1/energy_grid_planning/README.md index f44227c..e599941 100644 --- a/v1/energy_grid_planning/README.md +++ b/v1/energy_grid_planning/README.md @@ -111,7 +111,7 @@ This is not a single-reasoner problem. Approving a data center at a structurally ### Tools - Python >= 3.10 -- RelationalAI Python SDK (`relationalai`) == 1.11.0 +- RelationalAI Python SDK (`relationalai`) == 1.15.0 ## Quickstart @@ -288,7 +288,7 @@ betweenness = grid_graph.betweenness_centrality() ### Stage 2.5: Paths -- Transmission Corridors & Contingency -> PREVIEW capability; requires `relationalai>=1.13`. +> PREVIEW capability; requires `relationalai>=1.15`. Where Stage 2 scores a *substation*, the **Graph** paths capability scores the *corridor* feeding each data center. It derives a bidirectional substation-to-substation edge from active transmission lines, enumerates generator-substation to DC-substation routes, and ranks each by the Stage 2 betweenness summed along its hops — the most fragile corridor is the one carrying the greatest through-traffic exposure. A contingency pass removes the highest-betweenness substation and re-enumerates to show which data centers reroute. The most-fragile load is persisted as `Substation.fragility_load`. diff --git a/v1/energy_grid_planning/energy_grid_planning.py b/v1/energy_grid_planning/energy_grid_planning.py index d6adec6..043b189 100644 --- a/v1/energy_grid_planning/energy_grid_planning.py +++ b/v1/energy_grid_planning/energy_grid_planning.py @@ -11,7 +11,7 @@ detection, and multi-metric centrality (betweenness, degree, eigenvector) on the transmission grid topology. Results are stored directly as Substation properties on the shared ontology. -- Stage 2.5 -- Paths (PREVIEW, relationalai>=1.13): enumerate generator-sub -> +- Stage 2.5 -- Paths (PREVIEW, relationalai>=1.15): enumerate generator-sub -> DC-sub transmission corridors, rank each by Stage 2 betweenness summed along the route (most fragile = greatest through-traffic exposure), and re-enumerate with the highest-betweenness substation offline. Persists Substation.fragility_load. @@ -657,7 +657,7 @@ def load_csv(filename): # -------------------------------------------------- # Stage 2.5: Paths -- Transmission Corridors & Contingency -# PREVIEW capability; requires relationalai>=1.13. +# PREVIEW capability; requires relationalai>=1.15. # -------------------------------------------------- # Composes on Stage 2's Substation.betweenness. Where Stage 2 scores a *node*, # paths scores the *corridor* feeding each data center: enumerate generator-sub -> diff --git a/v1/energy_grid_planning/runbook.md b/v1/energy_grid_planning/runbook.md index 19b0ceb..5df47c8 100644 --- a/v1/energy_grid_planning/runbook.md +++ b/v1/energy_grid_planning/runbook.md @@ -98,7 +98,7 @@ Plan routing sub-questions to predictive, graph, rules, and prescriptive reasone 1 connected component, 3 Louvain communities (North Texas, West Texas, Gulf Coast); DFW, Houston, San Antonio flagged `is_structurally_critical`; 7 of 10 DC requests target critical nodes. -### 5b. Trace fragile transmission corridors (PREVIEW, requires `relationalai>=1.13`) +### 5b. Trace fragile transmission corridors (PREVIEW, requires `relationalai>=1.15`) **Prompt** diff --git a/v1/it-dependency-mapping/README.md b/v1/it-dependency-mapping/README.md index 59031ef..e77276a 100644 --- a/v1/it-dependency-mapping/README.md +++ b/v1/it-dependency-mapping/README.md @@ -52,7 +52,7 @@ This template demonstrates **Graph** reasoning -- specifically variable-length p - Python >= 3.10 - A Snowflake account that has the RAI Native App installed. - A Snowflake user with permissions to access the RAI Native App. -- `relationalai >= 1.13` -- the path-traversal API is a preview capability introduced in 1.13. +- `relationalai >= 1.15` -- the path-traversal API is a preview capability (introduced in 1.13, validated on 1.15). ## Quickstart @@ -128,7 +128,7 @@ Feature.contributes_to = model.Relationship( ### 2. Enumerate Downstream Paths -`model.path(Feature.contributes_to.repeat(1, MAX_DEPTH))` describes a variable-length traversal of 1 to `MAX_DEPTH` `contributes_to` edges. `all_paths()` enumerates every such path. Each result is a `PathTraversal`: `p.length` is its hop count and `p.nodes` is the ordered sequence of features it visits: +`model.path(Feature.contributes_to.repeat(1, MAX_DEPTH))` describes a variable-length traversal of 1 to `MAX_DEPTH` `contributes_to` edges. `all_paths()` enumerates every such path. Each result is a `PathTraversal`; project `p.nodes` to get the ordered features it visits. Do not also select `p.length` alongside `p.nodes` -- that fans the node rows out; derive the hop count from the maximum node index instead: ```python p_pattern = model.path(Feature.contributes_to.repeat(1, MAX_DEPTH)) @@ -136,23 +136,24 @@ paths_df = ( model.where(p := p_pattern.all_paths()) .select( p.alias("path_id"), - p.length.alias("hops"), p.nodes["index"].alias("step"), Feature(p.nodes).id.alias("feature_id"), Feature(p.nodes).name.alias("feature_name"), ) .to_df() ) +paths_df["step"] = paths_df["step"].astype(int) +# Projecting p.nodes can emit duplicate (path, step) rows -- dedupe before reassembly. +paths_df = paths_df.drop_duplicates(["path_id", "step"]).sort_values(["path_id", "step"]) ``` -Each path arrives as one row per visited node. Grouping on the path-id column and ordering by node index reassembles the ordered chain: +Each path arrives as one row per visited node. Grouping on the path-id column reassembles the ordered chain; the hop count is the maximum node index: ```python chains = ( - paths_df.sort_values(["path_id", "step"]) - .groupby("path_id") + paths_df.groupby("path_id") .agg( - hops=("hops", "first"), + hops=("step", "max"), node_ids=("feature_id", lambda s: tuple(s)), chain=("feature_name", lambda s: " -> ".join(s)), ) @@ -232,7 +233,7 @@ The deepest chain runs five hops from a raw source all the way to a downstream d
Why does model.path(...) raise an AttributeError or ImportError? -- The path-traversal API is a preview capability introduced in `relationalai` 1.13. Confirm your installed version with `python -c "import relationalai; print(relationalai.__version__)"` and upgrade if it is older. +- The path-traversal API is a preview capability introduced in `relationalai` 1.13 and validated on 1.15. Confirm your installed version (>= 1.15) with `python -c "import relationalai; print(relationalai.__version__)"` and upgrade if it is older.
diff --git a/v1/it-dependency-mapping/it_dependency_mapping.py b/v1/it-dependency-mapping/it_dependency_mapping.py index 6e6310f..fc1bfcd 100644 --- a/v1/it-dependency-mapping/it_dependency_mapping.py +++ b/v1/it-dependency-mapping/it_dependency_mapping.py @@ -80,7 +80,7 @@ def load_csv(path): # -------------------------------------------------- # Paths: enumerate downstream dependency paths -# PREVIEW capability; requires relationalai>=1.13. +# PREVIEW capability; requires relationalai>=1.15. # -------------------------------------------------- # model.path(Feature.contributes_to.repeat(1, MAX_DEPTH)) describes a # variable-length traversal of 1..MAX_DEPTH contributes_to edges. all_paths() diff --git a/v1/telco_network_recovery/README.md b/v1/telco_network_recovery/README.md index 256ff9a..db8346a 100644 --- a/v1/telco_network_recovery/README.md +++ b/v1/telco_network_recovery/README.md @@ -59,7 +59,7 @@ Built using **predictive reasoning** (GNN on a heterogeneous graph), **rules-bas ### Tools - Python ≥ 3.10. -- RelationalAI Python SDK (`relationalai == 1.12.0`). +- RelationalAI Python SDK (`relationalai == 1.15.0`). ### One-time Snowflake setup for GNN experiment artifacts diff --git a/v1/telco_network_recovery/runbook.md b/v1/telco_network_recovery/runbook.md index 10dcddd..a601042 100644 --- a/v1/telco_network_recovery/runbook.md +++ b/v1/telco_network_recovery/runbook.md @@ -116,7 +116,7 @@ Four derived health properties (`avg_packet_loss`, `avg_latency_ms`, `avg_error_ `Subscriber.influence_score` (PageRank, 1,200 subs) plus two per-tower properties on the 142 critical towers: `CellTower.weighted_impact` (headline — sum of `Subscriber.customer_value = LTV × (1 + churn_risk_score)` over ACTIVE callers routed through; CDR-weighted so heavy callers count more than once) and `CellTower.weighted_pagerank` (secondary — sum of PageRank influence over the same set). The prescriptive MIP consumes `weighted_impact` in its objective. -### 6b. Trace the most-influential call paths (PREVIEW, requires `relationalai>=1.13`) +### 6b. Trace the most-influential call paths (PREVIEW, requires `relationalai>=1.15`) **Prompt** diff --git a/v1/telco_network_recovery/telco_network_recovery.py b/v1/telco_network_recovery/telco_network_recovery.py index 2357e56..1935262 100644 --- a/v1/telco_network_recovery/telco_network_recovery.py +++ b/v1/telco_network_recovery/telco_network_recovery.py @@ -30,7 +30,7 @@ of ACTIVE subscribers whose calls route through the tower; PageRank-weighted impact is exposed alongside as a secondary network-effect signal. -- Stage 3.5 -- Paths (PREVIEW, relationalai>=1.13): enumerate caller +- Stage 3.5 -- Paths (PREVIEW, relationalai>=1.15): enumerate caller -> callee call paths (<=3 hops, simple) from the highest-PageRank subscriber over an arity-3 `{Subscriber} via {CellTower} calls {Subscriber}` edge, recovering the routing tower on each hop via @@ -814,7 +814,7 @@ class SubscriberStatus(model.Enum): # -------------------------------------------------- # Stage 3.5: Paths -- Call-path enumeration through critical towers -# PREVIEW capability; requires relationalai>=1.13. +# PREVIEW capability; requires relationalai>=1.15. # -------------------------------------------------- # Composes on Stage 3's Subscriber.influence_score (PageRank) and Stage 2's # CellTower.is_critical_restore. Where PageRank scores a *node*, paths scores