diff --git a/ai/docs/gpu/dgx-spark-testing.md b/ai/docs/gpu/dgx-spark-testing.md index 9c83011001..8f08facd73 100644 --- a/ai/docs/gpu/dgx-spark-testing.md +++ b/ai/docs/gpu/dgx-spark-testing.md @@ -30,6 +30,12 @@ image with `--gpus all`. Key env-var defaults: | `WITH_LINT` | `0` | Run ruff | | `WITH_TYPECHECK` | `0` | Run mypy | +For `RAPIDS_VERSION=25.02`, the script injects a newer CUDA Python bridge +(`numba-cuda==0.22.2`, `cuda-python==12.9.5`, and matching CUDA helper +packages) during image build. The stock 25.02 image can initialize CUDA on +DGX Spark GB10 and run CuPy, but it segfaults on trivial cuDF host +materialization such as `cudf.DataFrame({"x": [1, 2]}).to_pandas()`. + The script **volume-mounts `graphistry/` read-only** at runtime, so you only need to sync changed source files — no full image rebuild required for most iterations. diff --git a/docker/test-rapids-official-local.sh b/docker/test-rapids-official-local.sh index a214478199..ef5caedecd 100755 --- a/docker/test-rapids-official-local.sh +++ b/docker/test-rapids-official-local.sh @@ -37,13 +37,29 @@ resolve_profile_pip_deps() { } resolve_profile_pip_pre_deps() { - local profile="$1" + local rapids_version="$1" + local profile="$2" + local pre_deps="" + + # RAPIDS 25.02 images ship numba-cuda 0.2.0 / cuda-python 12.8.0. + # On DGX Spark GB10 (compute capability 12.1), that stack can initialize + # CUDA and run CuPy, but segfaults on trivial cuDF host materialization + # such as cudf.DataFrame({"x": [1, 2]}).to_pandas(). Use the newer CUDA + # Python bridge from the working 26.02 image while keeping RAPIDS at 25.02. + if [[ "$rapids_version" == "25.02" ]]; then + pre_deps="numba-cuda==0.22.2 cuda-bindings==12.9.5 cuda-core==0.3.2 cuda-python==12.9.5" + fi + case "$profile" in basic|gfql) - echo "" + echo "$pre_deps" ;; ai) - echo "--index-url https://download.pytorch.org/whl/cpu torch==2.11.0+cpu" + if [[ -n "$pre_deps" ]]; then + echo "$pre_deps --extra-index-url https://download.pytorch.org/whl/cpu torch==2.11.0+cpu" + else + echo "--index-url https://download.pytorch.org/whl/cpu torch==2.11.0+cpu" + fi ;; *) echo "Unsupported PROFILE: ${profile}" >&2 @@ -129,7 +145,7 @@ if [[ -z "$PIP_DEPS" ]]; then fi if [[ -z "$PIP_PRE_DEPS" ]]; then - PIP_PRE_DEPS="$(resolve_profile_pip_pre_deps "$PROFILE")" + PIP_PRE_DEPS="$(resolve_profile_pip_pre_deps "$RAPIDS_VERSION" "$PROFILE")" fi if [[ -z "$TEST_FILES" ]]; then diff --git a/graphistry/compute/ast.py b/graphistry/compute/ast.py index 49abe6adaa..8f5c0d9779 100644 --- a/graphistry/compute/ast.py +++ b/graphistry/compute/ast.py @@ -1786,6 +1786,28 @@ def semi_apply_mark( ) +def join_apply( + *, + binding_ops: List[Dict[str, Any]], + join_aliases: Sequence[str], + how: str = "inner", +) -> ASTCall: + """Join active rows with rows produced by a correlated binding pattern. + + ``binding_ops`` encodes the right-hand pattern to evaluate as bindings rows. + ``join_aliases`` names shared aliases used as join keys. + ``how`` is ``"inner"`` or ``"left"``. + """ + return ASTCall( + "join_apply", + { + "binding_ops": binding_ops, + "join_aliases": list(join_aliases), + "how": how, + }, + ) + + def order_by(keys: Iterable[Tuple[Any, str]]) -> ASTCall: """Create an ORDER BY operation for GFQL row pipelines.""" return ASTCall("order_by", {"keys": list(keys)}) diff --git a/graphistry/compute/gfql/call/validation.py b/graphistry/compute/gfql/call/validation.py index d5fa956178..76e60c0295 100644 --- a/graphistry/compute/gfql/call/validation.py +++ b/graphistry/compute/gfql/call/validation.py @@ -294,6 +294,18 @@ def _semi_apply_mark_added_node_cols(params: Dict[str, object]) -> Set[str]: schema_effects=_schema_effects(adds_node_cols=_semi_apply_mark_added_node_cols), ), + 'join_apply': _method_entry( + allowed_params={'binding_ops', 'join_aliases', 'how'}, + required_params={'binding_ops', 'join_aliases'}, + param_validators={ + 'binding_ops': is_list_of_dicts, + 'join_aliases': is_non_empty_list_of_strings, + 'how': lambda v: v in {'inner', 'left'}, + }, + description='Join active rows with correlated binding rows', + schema_effects=NO_SCHEMA_EFFECTS, + ), + 'order_by': _method_entry( allowed_params={'keys'}, required_params={'keys'}, diff --git a/graphistry/compute/gfql/row/pipeline.py b/graphistry/compute/gfql/row/pipeline.py index c977eae65a..07062cce87 100644 --- a/graphistry/compute/gfql/row/pipeline.py +++ b/graphistry/compute/gfql/row/pipeline.py @@ -143,6 +143,7 @@ def _gfql_cudf_list_sort_series_requires_host_bridge(series: Any) -> bool: "rows", "semi_apply_mark", "anti_semi_apply", + "join_apply", "where_rows", "select", "with_", @@ -4170,6 +4171,94 @@ def semi_apply_mark( out_df[out_col] = ~merged[marker_col].isna() return self._gfql_row_table(out_df) + def join_apply( + self, + binding_ops: List[Dict[str, Any]], + join_aliases: List[str], + how: str = "inner", + ) -> "Plottable": + """Join active rows with rows produced by ``binding_ops``.""" + if not isinstance(binding_ops, list) or len(binding_ops) == 0: + raise ValueError("join_apply(binding_ops=...) requires a non-empty list") + if not isinstance(join_aliases, list) or len(join_aliases) == 0: + raise ValueError("join_apply(join_aliases=...) requires a non-empty list") + if not all(isinstance(alias, str) and alias.strip() for alias in join_aliases): + raise ValueError("join_apply(join_aliases=...) must be a list of non-empty strings") + if how not in {"inner", "left"}: + raise ValueError("join_apply(how=...) must be 'inner' or 'left'") + + left_df = self._gfql_get_active_table() + if left_df is None: + return self._gfql_row_table(self._gfql_empty_frame()) + if len(left_df) == 0: + return self._gfql_row_table(left_df.copy()) + + base_graph = getattr(self, "_gfql_rows_base_graph", None) + if base_graph is None: + base_graph = getattr(self, "_g", None) + if base_graph is None: + base_graph = self + + right_rows = _RowPipelineAdapter(cast("Plottable", base_graph))._gfql_binding_ops_row_table(binding_ops) + right_df = getattr(right_rows, "_nodes", None) + + node_id_col = getattr(base_graph, "_node", None) + if not isinstance(node_id_col, str) or node_id_col == "": + node_id_col = "id" + + join_cols: List[str] = [] + missing_aliases: List[str] = [] + right_cols: List[str] = [] if right_df is None else list(right_df.columns) + for alias in join_aliases: + candidates = (f"{alias}.{node_id_col}", alias) + chosen = next( + ( + col + for col in candidates + if col in left_df.columns + and right_df is not None + and col in right_df.columns + ), + None, + ) + if chosen is None: + missing_aliases.append(alias) + continue + if chosen not in join_cols: + join_cols.append(chosen) + + if not join_cols: + raise GFQLValidationError( + ErrorCode.E108, + "GFQL row join could not recover shared alias join columns", + field="join_apply", + value={"join_aliases": join_aliases, "missing_aliases": missing_aliases}, + suggestion="Join on aliases present in both the active row table and binding_ops rows.", + language="gfql", + ) + + right_payload_cols = [ + col + for col in right_cols + if col in join_cols or col not in left_df.columns + ] + if right_df is None or len(right_df) == 0: + if how == "inner": + return self._gfql_row_table(left_df.iloc[0:0].copy()) + out_df = left_df.copy() + for col in right_payload_cols: + if col not in out_df.columns: + out_df = out_df.assign(**{col: self._gfql_broadcast_scalar(out_df, None)}) + return self._gfql_row_table(out_df) + + joined = left_df.merge( + right_df[right_payload_cols], + on=join_cols, + how=how, + sort=False, + ) + return self._gfql_row_table(joined) + def return_(self, items: List[Any]) -> "Plottable": return self.select(items) diff --git a/graphistry/tests/compute/gfql/cypher/test_lowering.py b/graphistry/tests/compute/gfql/cypher/test_lowering.py index 5ac8758a4b..0e4d50d053 100644 --- a/graphistry/tests/compute/gfql/cypher/test_lowering.py +++ b/graphistry/tests/compute/gfql/cypher/test_lowering.py @@ -2535,6 +2535,103 @@ def test_string_cypher_supports_is1_seed_city_projection_shape() -> None: ] +def test_issue_1411_connected_join_property_projection_shape() -> None: + nodes = pd.DataFrame( + [ + { + "id": "p1", + "labels": ["Person"], + "label__Person": True, + "firstName": "Seed", + "name": None, + }, + { + "id": "p2", + "labels": ["Person"], + "label__Person": True, + "firstName": "Friend", + "name": None, + }, + { + "id": "c1", + "labels": ["Place"], + "label__Place": True, + "firstName": None, + "name": "City", + }, + ] + ) + edges = pd.DataFrame( + [ + {"s": "p1", "d": "c1", "type": "IS_LOCATED_IN"}, + {"s": "p2", "d": "c1", "type": "IS_LOCATED_IN"}, + ] + ) + + result = _mk_graph(nodes, edges).gfql( + "MATCH " + "(person:Person {id: 'p1'})-[:IS_LOCATED_IN]->(city:Place), " + "(friend:Person)-[:IS_LOCATED_IN]->(city) " + "RETURN friend.id AS friendId, friend.firstName AS friendFirstName, city.name AS cityName " + "ORDER BY friendId" + ) + + assert result._nodes.to_dict(orient="records") == [ + {"friendId": "p1", "friendFirstName": "Seed", "cityName": "City"}, + {"friendId": "p2", "friendFirstName": "Friend", "cityName": "City"}, + ] + + +def test_issue_1411_connected_join_whole_row_projection_shape() -> None: + nodes = pd.DataFrame( + [ + { + "id": "p1", + "labels": ["Person"], + "label__Person": True, + "firstName": "Seed", + "name": None, + }, + { + "id": "p2", + "labels": ["Person"], + "label__Person": True, + "firstName": "Friend", + "name": None, + }, + { + "id": "c1", + "labels": ["Place"], + "label__Place": True, + "firstName": None, + "name": "City", + }, + ] + ) + edges = pd.DataFrame( + [ + {"s": "p1", "d": "c1", "type": "IS_LOCATED_IN"}, + {"s": "p2", "d": "c1", "type": "IS_LOCATED_IN"}, + ] + ) + + result = _mk_graph(nodes, edges).gfql( + "MATCH " + "(person:Person {id: 'p1'})-[:IS_LOCATED_IN]->(city:Place), " + "(friend:Person {id: 'p2'})-[:IS_LOCATED_IN]->(city) " + "RETURN city" + ) + + assert result._nodes.to_dict(orient="records") == [ + {"city": "(:Place {name: 'City'})"} + ] + entity_meta = getattr(result, "_cypher_entity_projection_meta") + assert entity_meta["city"]["table"] == "nodes" + assert entity_meta["city"]["alias"] == "city" + assert entity_meta["city"]["id_column"] == "id" + assert entity_meta["city"]["ids"].tolist() == ["c1"] + + def test_string_cypher_supports_is3_seed_expand_projection_shape() -> None: nodes = pd.DataFrame( [ diff --git a/graphistry/tests/test_compute_chain.py b/graphistry/tests/test_compute_chain.py index 0d3ab1796c..e8f62016c3 100644 --- a/graphistry/tests/test_compute_chain.py +++ b/graphistry/tests/test_compute_chain.py @@ -5,7 +5,7 @@ from common import NoAuthTestCase import pytest -from graphistry.compute.ast import n, e_forward, e_reverse, e_undirected, is_in, rows, select +from graphistry.compute.ast import n, e_forward, e_reverse, e_undirected, is_in, join_apply, rows, select from graphistry.compute.chain import _inject_binding_ops_if_needed from graphistry.compute.exceptions import GFQLValidationError from graphistry.tests.test_compute import CGFull @@ -854,8 +854,16 @@ class TestChainBindingsTable(NoAuthTestCase): def _mk_graph(self, nodes_df, edges_df): return CGFull().nodes(nodes_df, "id").edges(edges_df, "s", "d") - def _mk_cudf_graph(self, nodes_df, edges_df): + def _require_cudf_runtime(self): cudf = pytest.importorskip("cudf") + try: + cudf.Series([1]) + except Exception as exc: + pytest.skip(f"cudf installed but runtime unavailable: {exc}") + return cudf + + def _mk_cudf_graph(self, nodes_df, edges_df): + cudf = self._require_cudf_runtime() return CGFull().nodes(cudf.from_pandas(nodes_df), "id").edges(cudf.from_pandas(edges_df), "s", "d") def _to_binding_ops(self, match_ops): @@ -1233,6 +1241,247 @@ def test_native_chain_rows_select_undirected_edge_alias_projection(self): } ] + def test_issue_1411_join_apply_projects_joined_message_rows(self): + """#1411: direct GFQL should join active friend rows to message rows.""" + g = self._mk_graph( + pd.DataFrame( + [ + { + "id": "seed", + "label__Person": True, + "label__Message": False, + "firstName": "Seed", + "creationDate": None, + }, + { + "id": "friend1", + "label__Person": True, + "label__Message": False, + "firstName": "Ada", + "creationDate": None, + }, + { + "id": "friend2", + "label__Person": True, + "label__Message": False, + "firstName": "Bea", + "creationDate": None, + }, + { + "id": "m1", + "label__Person": False, + "label__Message": True, + "firstName": None, + "creationDate": 20, + }, + { + "id": "m2", + "label__Person": False, + "label__Message": True, + "firstName": None, + "creationDate": 10, + }, + ] + ), + pd.DataFrame( + [ + {"s": "seed", "d": "friend1", "type": "KNOWS"}, + {"s": "seed", "d": "friend2", "type": "KNOWS"}, + {"s": "m1", "d": "friend1", "type": "HAS_CREATOR"}, + {"s": "m2", "d": "friend2", "type": "HAS_CREATOR"}, + ] + ), + ) + friend_ops = [ + n({"id": "seed", "label__Person": True}, name="seed"), + e_undirected({"type": "KNOWS"}), + n({"label__Person": True}, name="friend"), + ] + message_ops = [ + n({"label__Message": True}, name="message"), + e_forward({"type": "HAS_CREATOR"}), + n({"label__Person": True}, name="friend"), + ] + + result = g.gfql([ + *friend_ops, + rows(), + join_apply(binding_ops=self._to_binding_ops(message_ops), join_aliases=["friend"]), + select([ + ("friendId", "friend.id"), + ("friendFirstName", "friend.firstName"), + ("messageId", "message.id"), + ("messageCreationDate", "message.creationDate"), + ]), + ]) + rows_out = ( + result._nodes + .sort_values("messageCreationDate", ascending=False) + .to_dict(orient="records") + ) + + assert rows_out == [ + { + "friendId": "friend1", + "friendFirstName": "Ada", + "messageId": "m1", + "messageCreationDate": 20.0, + }, + { + "friendId": "friend2", + "friendFirstName": "Bea", + "messageId": "m2", + "messageCreationDate": 10.0, + }, + ] + + def test_issue_1411_join_apply_projects_joined_message_rows_on_cudf(self): + """#1411: row joins should keep direct GFQL projection engine-polymorphic.""" + cudf = self._require_cudf_runtime() + from cudf.testing import assert_frame_equal + + g = self._mk_cudf_graph( + pd.DataFrame( + [ + { + "id": "seed", + "label__Person": True, + "label__Message": False, + "firstName": "Seed", + "creationDate": None, + }, + { + "id": "friend1", + "label__Person": True, + "label__Message": False, + "firstName": "Ada", + "creationDate": None, + }, + { + "id": "friend2", + "label__Person": True, + "label__Message": False, + "firstName": "Bea", + "creationDate": None, + }, + { + "id": "m1", + "label__Person": False, + "label__Message": True, + "firstName": None, + "creationDate": 20, + }, + { + "id": "m2", + "label__Person": False, + "label__Message": True, + "firstName": None, + "creationDate": 10, + }, + ] + ), + pd.DataFrame( + [ + {"s": "seed", "d": "friend1", "type": "KNOWS"}, + {"s": "seed", "d": "friend2", "type": "KNOWS"}, + {"s": "m1", "d": "friend1", "type": "HAS_CREATOR"}, + {"s": "m2", "d": "friend2", "type": "HAS_CREATOR"}, + ] + ), + ) + friend_ops = [ + n({"id": "seed", "label__Person": True}, name="seed"), + e_undirected({"type": "KNOWS"}), + n({"label__Person": True}, name="friend"), + ] + message_ops = [ + n({"label__Message": True}, name="message"), + e_forward({"type": "HAS_CREATOR"}), + n({"label__Person": True}, name="friend"), + ] + + result = g.gfql([ + *friend_ops, + rows(), + join_apply(binding_ops=self._to_binding_ops(message_ops), join_aliases=["friend"]), + select([ + ("friendId", "friend.id"), + ("friendFirstName", "friend.firstName"), + ("messageId", "message.id"), + ("messageCreationDate", "message.creationDate"), + ]), + ]) + rows_out = ( + result._nodes + .sort_values("messageCreationDate", ascending=False) + .reset_index(drop=True) + ) + + expected = cudf.DataFrame([ + { + "friendId": "friend1", + "friendFirstName": "Ada", + "messageId": "m1", + "messageCreationDate": 20.0, + }, + { + "friendId": "friend2", + "friendFirstName": "Bea", + "messageId": "m2", + "messageCreationDate": 10.0, + }, + ]) + assert_frame_equal(rows_out, expected, check_dtype=False) + + def test_issue_1411_join_apply_left_preserves_unmatched_rows(self): + """Left row joins should keep active rows without a correlated match.""" + g = self._mk_graph( + pd.DataFrame( + [ + {"id": "seed", "label__Person": True, "label__Message": False, "firstName": "Seed"}, + {"id": "friend1", "label__Person": True, "label__Message": False, "firstName": "Ada"}, + {"id": "friend2", "label__Person": True, "label__Message": False, "firstName": "Bea"}, + {"id": "m1", "label__Person": False, "label__Message": True, "firstName": None}, + ] + ), + pd.DataFrame( + [ + {"s": "seed", "d": "friend1", "type": "KNOWS"}, + {"s": "seed", "d": "friend2", "type": "KNOWS"}, + {"s": "m1", "d": "friend1", "type": "HAS_CREATOR"}, + ] + ), + ) + friend_ops = [ + n({"id": "seed", "label__Person": True}, name="seed"), + e_undirected({"type": "KNOWS"}), + n({"label__Person": True}, name="friend"), + ] + message_ops = [ + n({"label__Message": True}, name="message"), + e_forward({"type": "HAS_CREATOR"}), + n({"label__Person": True}, name="friend"), + ] + + result = g.gfql([ + *friend_ops, + rows(), + join_apply( + binding_ops=self._to_binding_ops(message_ops), + join_aliases=["friend"], + how="left", + ), + select([ + ("friendId", "friend.id"), + ("messageId", "message.id"), + ]), + ]) + rows_out = result._nodes.sort_values("friendId").to_dict(orient="records") + + assert rows_out[0] == {"friendId": "friend1", "messageId": "m1"} + assert rows_out[1]["friendId"] == "friend2" + assert pd.isna(rows_out[1]["messageId"]) + def test_native_chain_rows_bindings_edge_alias(self): """#982: edge alias properties should be accessible.""" g = self._mk_graph(