Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ai/docs/gpu/dgx-spark-testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ image with `--gpus all`. Key env-var defaults:
| `WITH_LINT` | `0` | Run ruff |
| `WITH_TYPECHECK` | `0` | Run mypy |

For `RAPIDS_VERSION=25.02`, the script injects a newer CUDA Python bridge
(`numba-cuda==0.22.2`, `cuda-python==12.9.5`, and matching CUDA helper
packages) during image build. The stock 25.02 image can initialize CUDA on
DGX Spark GB10 and run CuPy, but it segfaults on trivial cuDF host
materialization such as `cudf.DataFrame({"x": [1, 2]}).to_pandas()`.

The script **volume-mounts `graphistry/` read-only** at runtime, so you only
need to sync changed source files — no full image rebuild required for most
iterations.
Expand Down
24 changes: 20 additions & 4 deletions docker/test-rapids-official-local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,29 @@ resolve_profile_pip_deps() {
}

resolve_profile_pip_pre_deps() {
local profile="$1"
local rapids_version="$1"
local profile="$2"
local pre_deps=""

# RAPIDS 25.02 images ship numba-cuda 0.2.0 / cuda-python 12.8.0.
# On DGX Spark GB10 (compute capability 12.1), that stack can initialize
# CUDA and run CuPy, but segfaults on trivial cuDF host materialization
# such as cudf.DataFrame({"x": [1, 2]}).to_pandas(). Use the newer CUDA
# Python bridge from the working 26.02 image while keeping RAPIDS at 25.02.
if [[ "$rapids_version" == "25.02" ]]; then
pre_deps="numba-cuda==0.22.2 cuda-bindings==12.9.5 cuda-core==0.3.2 cuda-python==12.9.5"
fi

case "$profile" in
basic|gfql)
echo ""
echo "$pre_deps"
;;
ai)
echo "--index-url https://download.pytorch.org/whl/cpu torch==2.11.0+cpu"
if [[ -n "$pre_deps" ]]; then
echo "$pre_deps --extra-index-url https://download.pytorch.org/whl/cpu torch==2.11.0+cpu"
else
echo "--index-url https://download.pytorch.org/whl/cpu torch==2.11.0+cpu"
fi
;;
*)
echo "Unsupported PROFILE: ${profile}" >&2
Expand Down Expand Up @@ -129,7 +145,7 @@ if [[ -z "$PIP_DEPS" ]]; then
fi

if [[ -z "$PIP_PRE_DEPS" ]]; then
PIP_PRE_DEPS="$(resolve_profile_pip_pre_deps "$PROFILE")"
PIP_PRE_DEPS="$(resolve_profile_pip_pre_deps "$RAPIDS_VERSION" "$PROFILE")"
fi

if [[ -z "$TEST_FILES" ]]; then
Expand Down
22 changes: 22 additions & 0 deletions graphistry/compute/ast.py
Original file line number Diff line number Diff line change
Expand Up @@ -1786,6 +1786,28 @@ def semi_apply_mark(
)


def join_apply(
*,
binding_ops: List[Dict[str, Any]],
join_aliases: Sequence[str],
how: str = "inner",
) -> ASTCall:
"""Join active rows with rows produced by a correlated binding pattern.

``binding_ops`` encodes the right-hand pattern to evaluate as bindings rows.
``join_aliases`` names shared aliases used as join keys.
``how`` is ``"inner"`` or ``"left"``.
"""
return ASTCall(
"join_apply",
{
"binding_ops": binding_ops,
"join_aliases": list(join_aliases),
"how": how,
},
)


def order_by(keys: Iterable[Tuple[Any, str]]) -> ASTCall:
"""Create an ORDER BY operation for GFQL row pipelines."""
return ASTCall("order_by", {"keys": list(keys)})
Expand Down
12 changes: 12 additions & 0 deletions graphistry/compute/gfql/call/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,18 @@ def _semi_apply_mark_added_node_cols(params: Dict[str, object]) -> Set[str]:
schema_effects=_schema_effects(adds_node_cols=_semi_apply_mark_added_node_cols),
),

'join_apply': _method_entry(
allowed_params={'binding_ops', 'join_aliases', 'how'},
required_params={'binding_ops', 'join_aliases'},
param_validators={
'binding_ops': is_list_of_dicts,
'join_aliases': is_non_empty_list_of_strings,
'how': lambda v: v in {'inner', 'left'},
},
description='Join active rows with correlated binding rows',
schema_effects=NO_SCHEMA_EFFECTS,
),

'order_by': _method_entry(
allowed_params={'keys'},
required_params={'keys'},
Expand Down
89 changes: 89 additions & 0 deletions graphistry/compute/gfql/row/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ def _gfql_cudf_list_sort_series_requires_host_bridge(series: Any) -> bool:
"rows",
"semi_apply_mark",
"anti_semi_apply",
"join_apply",
"where_rows",
"select",
"with_",
Expand Down Expand Up @@ -4170,6 +4171,94 @@ def semi_apply_mark(
out_df[out_col] = ~merged[marker_col].isna()
return self._gfql_row_table(out_df)

def join_apply(
self,
binding_ops: List[Dict[str, Any]],
join_aliases: List[str],
how: str = "inner",
) -> "Plottable":
"""Join active rows with rows produced by ``binding_ops``."""
if not isinstance(binding_ops, list) or len(binding_ops) == 0:
raise ValueError("join_apply(binding_ops=...) requires a non-empty list")
if not isinstance(join_aliases, list) or len(join_aliases) == 0:
raise ValueError("join_apply(join_aliases=...) requires a non-empty list")
if not all(isinstance(alias, str) and alias.strip() for alias in join_aliases):
raise ValueError("join_apply(join_aliases=...) must be a list of non-empty strings")
if how not in {"inner", "left"}:
raise ValueError("join_apply(how=...) must be 'inner' or 'left'")

left_df = self._gfql_get_active_table()
if left_df is None:
return self._gfql_row_table(self._gfql_empty_frame())
if len(left_df) == 0:
return self._gfql_row_table(left_df.copy())

base_graph = getattr(self, "_gfql_rows_base_graph", None)
if base_graph is None:
base_graph = getattr(self, "_g", None)
if base_graph is None:
base_graph = self

right_rows = _RowPipelineAdapter(cast("Plottable", base_graph))._gfql_binding_ops_row_table(binding_ops)
right_df = getattr(right_rows, "_nodes", None)

node_id_col = getattr(base_graph, "_node", None)
if not isinstance(node_id_col, str) or node_id_col == "":
node_id_col = "id"

join_cols: List[str] = []
missing_aliases: List[str] = []
right_cols: List[str] = [] if right_df is None else list(right_df.columns)
for alias in join_aliases:
candidates = (f"{alias}.{node_id_col}", alias)
chosen = next(
(
col
for col in candidates
if col in left_df.columns
and right_df is not None
and col in right_df.columns
),
None,
)
if chosen is None:
missing_aliases.append(alias)
continue
if chosen not in join_cols:
join_cols.append(chosen)

if not join_cols:
raise GFQLValidationError(
ErrorCode.E108,
"GFQL row join could not recover shared alias join columns",
field="join_apply",
value={"join_aliases": join_aliases, "missing_aliases": missing_aliases},
suggestion="Join on aliases present in both the active row table and binding_ops rows.",
language="gfql",
)

right_payload_cols = [
col
for col in right_cols
if col in join_cols or col not in left_df.columns
]
if right_df is None or len(right_df) == 0:
if how == "inner":
return self._gfql_row_table(left_df.iloc[0:0].copy())
out_df = left_df.copy()
for col in right_payload_cols:
if col not in out_df.columns:
out_df = out_df.assign(**{col: self._gfql_broadcast_scalar(out_df, None)})
return self._gfql_row_table(out_df)

joined = left_df.merge(
right_df[right_payload_cols],
on=join_cols,
how=how,
sort=False,
)
return self._gfql_row_table(joined)

def return_(self, items: List[Any]) -> "Plottable":
return self.select(items)

Expand Down
97 changes: 97 additions & 0 deletions graphistry/tests/compute/gfql/cypher/test_lowering.py
Original file line number Diff line number Diff line change
Expand Up @@ -2535,6 +2535,103 @@ def test_string_cypher_supports_is1_seed_city_projection_shape() -> None:
]


def test_issue_1411_connected_join_property_projection_shape() -> None:
nodes = pd.DataFrame(
[
{
"id": "p1",
"labels": ["Person"],
"label__Person": True,
"firstName": "Seed",
"name": None,
},
{
"id": "p2",
"labels": ["Person"],
"label__Person": True,
"firstName": "Friend",
"name": None,
},
{
"id": "c1",
"labels": ["Place"],
"label__Place": True,
"firstName": None,
"name": "City",
},
]
)
edges = pd.DataFrame(
[
{"s": "p1", "d": "c1", "type": "IS_LOCATED_IN"},
{"s": "p2", "d": "c1", "type": "IS_LOCATED_IN"},
]
)

result = _mk_graph(nodes, edges).gfql(
"MATCH "
"(person:Person {id: 'p1'})-[:IS_LOCATED_IN]->(city:Place), "
"(friend:Person)-[:IS_LOCATED_IN]->(city) "
"RETURN friend.id AS friendId, friend.firstName AS friendFirstName, city.name AS cityName "
"ORDER BY friendId"
)

assert result._nodes.to_dict(orient="records") == [
{"friendId": "p1", "friendFirstName": "Seed", "cityName": "City"},
{"friendId": "p2", "friendFirstName": "Friend", "cityName": "City"},
]


def test_issue_1411_connected_join_whole_row_projection_shape() -> None:
nodes = pd.DataFrame(
[
{
"id": "p1",
"labels": ["Person"],
"label__Person": True,
"firstName": "Seed",
"name": None,
},
{
"id": "p2",
"labels": ["Person"],
"label__Person": True,
"firstName": "Friend",
"name": None,
},
{
"id": "c1",
"labels": ["Place"],
"label__Place": True,
"firstName": None,
"name": "City",
},
]
)
edges = pd.DataFrame(
[
{"s": "p1", "d": "c1", "type": "IS_LOCATED_IN"},
{"s": "p2", "d": "c1", "type": "IS_LOCATED_IN"},
]
)

result = _mk_graph(nodes, edges).gfql(
"MATCH "
"(person:Person {id: 'p1'})-[:IS_LOCATED_IN]->(city:Place), "
"(friend:Person {id: 'p2'})-[:IS_LOCATED_IN]->(city) "
"RETURN city"
)

assert result._nodes.to_dict(orient="records") == [
{"city": "(:Place {name: 'City'})"}
]
entity_meta = getattr(result, "_cypher_entity_projection_meta")
assert entity_meta["city"]["table"] == "nodes"
assert entity_meta["city"]["alias"] == "city"
assert entity_meta["city"]["id_column"] == "id"
assert entity_meta["city"]["ids"].tolist() == ["c1"]


def test_string_cypher_supports_is3_seed_expand_projection_shape() -> None:
nodes = pd.DataFrame(
[
Expand Down
Loading
Loading