From 35695d51e775ea548d39a7552a8a839492e12a7d Mon Sep 17 00:00:00 2001 From: Leo Meyerovich Date: Mon, 11 May 2026 00:44:00 -0700 Subject: [PATCH 1/2] Fix #1413 IC3 carried row reentry --- CHANGELOG.md | 1 + bin/ci_cypher_surface_guard_baseline.json | 2 +- graphistry/compute/gfql/cypher/lowering.py | 269 +++++++++++++----- graphistry/compute/gfql/cypher/parser.py | 6 +- .../gfql/cypher/reentry/compiletime.py | 10 +- .../compute/gfql/cypher/reentry/rewrite.py | 17 ++ graphistry/compute/gfql/row/pipeline.py | 54 +++- .../compute/gfql/cypher/test_lowering.py | 74 +++++ 8 files changed, 334 insertions(+), 99 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f447f0a84..9a3d138a00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - **GFQL / Cypher ORDER BY on stringified-list columns uses Cypher list-orderability (#1359, meta #1353 item #1)**: When a list-valued property is stored as a string column (e.g. round-tripped through CSV / Arrow string columns), `ORDER BY` previously fell back to lex string sort, which mishandles negative numbers because `"-"` < `"2"` in ASCII (e.g. `"[1, -20]"` sorted before `"[1, 2]"`). Added `order_detect_stringified_list_series` + `parse_stringified_list_series` in `graphistry/compute/gfql/row/ordering.py`, and routed the row pipeline through `build_list_sort_columns` after `ast.literal_eval`-parsing the string entries when the column is fully list-shaped (`^\[.*\]$` per-row). Python-list-typed columns continue through the existing list-aware path unchanged. Includes pygraphistry-side regression coverage on both Python-list and stringified-list inputs (`test_string_cypher_order_by_python_list_column_uses_list_orderability`, `test_string_cypher_order_by_stringified_list_column_uses_list_orderability`). The matching TCK port-level fixture/runner fixes that flip the 14 `with-orderBy` wrong-row scenarios to `success_matches_expected` are tracked in [tck-gfql #36](https://github.com/graphistry/tck-gfql/issues/36). ### Fixed +- **GFQL / Cypher IC3 carried-row reentry joined aggregation (#1413, #880)**: Cypher lowering now admits the IC3/cross-country-style `WITH person, collect(city) AS cities MATCH ...` bounded-reentry shape with multiple post-reentry `WITH` stages, carries the collected city list alongside the `person` whole-row alias, and evaluates whole-row node membership such as `NOT friendCity IN cities` against collected entity lists. Added adversarial regressions for `collect(city)` and `collect(DISTINCT city)` with searched CASE aggregation and post-aggregate filtering in `graphistry/tests/compute/gfql/cypher/test_lowering.py`. - **GFQL / Cypher joined-row aggregation CASE chained-comparison lowering (#1413, #880)**: Cypher lowering now rewrites chained comparisons inside searched `CASE WHEN ... THEN` conditions before row-expression validation, so LDBC IC4/new-topics-style joined-row aggregation queries using `$startDate <= post.creationDate < $endDate` no longer fail the local GFQL subset gate. The rewrite is constrained to unquoted CASE conditions, preserves unrelated CASE comparison bodies, and adds adversarial regression coverage for multiple chained CASE arms and multiple joined-row aggregation CASE flags in `graphistry/tests/compute/gfql/cypher/test_lowering.py`. - **GFQL / Cypher temporal historical named-zone canonicalization + comparison parity (`#1406`, `#1353`)**: Direct-Cypher datetime canonicalization now applies Neo4j/TCK-compatible historical timezone offsets for pre-standard-time `Europe/Stockholm` named-zone literals in `graphistry/compute/gfql/temporal_text.py` (notably `1818-07-21` -> `+00:53:28`). This closes the residual wrong-row case `expr-temporal2-6-5` and keeps equality/comparison behavior consistent when one side is zone-derived and the other is explicit offset text. Added focused regression coverage in `graphistry/tests/compute/gfql/cypher/test_temporal_text.py` and `graphistry/tests/compute/gfql/cypher/test_lowering.py`. - **GFQL / Cypher structural list/map equality now preserves null-unknown semantics (#1405, #1353)**: Direct-Cypher and row-pipeline comparison evaluation in `graphistry/compute/gfql/row/pipeline.py` now uses recursive tri-valued structural equality for list/map families under `=`, `!=`, and `<>`, so nested null comparisons return `null` instead of collapsing to Python `true/false` (for example `[[1], [2]] = [[1], [null]]`, `{k: null} = {k: null}`). Added regression coverage in `graphistry/tests/compute/test_gfql.py`, plus expanded ORDER BY nested non-primitive (raw + stringified map/list) pandas/cuDF parity amplification in `graphistry/tests/compute/gfql/test_row_pipeline_ops.py`, including RAPIDS 25.02/26.02 dgx validation. diff --git a/bin/ci_cypher_surface_guard_baseline.json b/bin/ci_cypher_surface_guard_baseline.json index 6d32c224f7..c42f497305 100644 --- a/bin/ci_cypher_surface_guard_baseline.json +++ b/bin/ci_cypher_surface_guard_baseline.json @@ -13,5 +13,5 @@ "max_properties": 0 } }, - "lowering_py_max_lines": 8862 + "lowering_py_max_lines": 8973 } diff --git a/graphistry/compute/gfql/cypher/lowering.py b/graphistry/compute/gfql/cypher/lowering.py index 67c7ac15e0..91c0b963f8 100644 --- a/graphistry/compute/gfql/cypher/lowering.py +++ b/graphistry/compute/gfql/cypher/lowering.py @@ -1645,6 +1645,16 @@ def _rewrite_collection_alias_entities( if isinstance(node, UnaryOp): return UnaryOp(node.op, _rewrite_collection_alias_entities(node.operand, alias_targets=alias_targets)) if isinstance(node, BinaryOp): + if ( + node.op == "in" + and isinstance(node.left, Identifier) + and node.left.name in alias_targets + ): + return BinaryOp( + node.op, + _entity_wrapper_call(node.left.name, alias_targets=alias_targets), + _rewrite_collection_alias_entities(node.right, alias_targets=alias_targets), + ) return BinaryOp( node.op, _rewrite_collection_alias_entities(node.left, alias_targets=alias_targets), @@ -2409,7 +2419,9 @@ def _validate_aggregate_expr_scope( def _is_multiplicity_sensitive_aggregate(agg_spec: _AggregateSpec) -> bool: if agg_spec.func in {"sum", "avg"}: return True - if agg_spec.func in {"count", "collect"}: + if agg_spec.func == "collect": + return True + if agg_spec.func == "count": return not agg_spec.distinct return False @@ -6615,26 +6627,60 @@ def _lower_general_row_projection( non_aggregate_items.append(item) binding_row_aliases = _binding_row_aliases_for_match(query.match, alias_targets=alias_targets) + forced_binding_row_aliases = False if _requires_relationship_multiplicity_bindings( aggregate_specs=aggregate_specs, relationship_count=relationship_count, ): base_active_alias: Optional[str] = None can_force_bindings = True - if any(item.expression.text in alias_targets for item in non_aggregate_items): + whole_row_group_alias_refs = { + item.expression.text + for item in non_aggregate_items + if item.expression.text in alias_targets + } + aggregate_alias_refs: Set[str] = set() + for agg_spec in aggregate_specs: + if agg_spec.expr_text is None: + continue + try: + aggregate_alias_refs.update( + _expr_match_aliases( + agg_spec.expr_text, + alias_targets=alias_targets, + params=params, + field=query.return_.kind, + line=agg_spec.span_line, + column=agg_spec.span_column, + ) + ) + except GFQLValidationError: + can_force_bindings = False + break + allow_whole_row_binding_grouping = ( + bool(whole_row_group_alias_refs) + and can_force_bindings + and bool(aggregate_alias_refs) + and aggregate_alias_refs <= set(alias_targets.keys()) + and all(isinstance(alias_targets.get(alias_name), ASTNode) for alias_name in whole_row_group_alias_refs) + ) + if whole_row_group_alias_refs and not allow_whole_row_binding_grouping: # Keep whole-row grouping on the existing conservative path. # This preserves the current fail-fast boundary for relationship- # pattern grouped aggregates such as `RETURN a, count(*)`. can_force_bindings = False - try: - base_active_alias = _active_match_alias( - query, - alias_targets=alias_targets, - allowed_match_aliases=None, - params=params, - ) - except GFQLValidationError: - can_force_bindings = False + if allow_whole_row_binding_grouping: + base_active_alias = next(iter(whole_row_group_alias_refs)) + else: + try: + base_active_alias = _active_match_alias( + query, + alias_targets=alias_targets, + allowed_match_aliases=None, + params=params, + ) + except GFQLValidationError: + can_force_bindings = False if can_force_bindings and base_active_alias is not None: if isinstance(alias_targets.get(base_active_alias), ASTEdge): can_force_bindings = False @@ -6654,16 +6700,23 @@ def _lower_general_row_projection( except GFQLValidationError: can_force_bindings = False break + if allow_whole_row_binding_grouping: + if not refs <= set(alias_targets.keys()): + can_force_bindings = False + break + continue if len(refs) > 1 or (len(refs) == 1 and base_active_alias not in refs): can_force_bindings = False break if can_force_bindings: binding_row_aliases = set(alias_targets.keys()) - binding_row_aliases = _apply_bound_scope_membership( - binding_row_aliases, - alias_targets=alias_targets, - bound_visible_aliases=bound_visible_aliases, - ) + forced_binding_row_aliases = True + if not forced_binding_row_aliases: + binding_row_aliases = _apply_bound_scope_membership( + binding_row_aliases, + alias_targets=alias_targets, + bound_visible_aliases=bound_visible_aliases, + ) active_match_alias = _active_match_alias( query, alias_targets=alias_targets, @@ -6735,6 +6788,7 @@ def _lower_general_row_projection( expr_to_output: Dict[str, str] = {} available_columns: Set[str] = set() + result_projection: Optional[ResultProjectionPlan] = None if aggregate_specs: projection_fn = with_ if query.return_.kind == "with" else return_ @@ -6742,6 +6796,7 @@ def _lower_general_row_projection( key_names: List[str] = [] temp_names: Set[str] = set() hidden_group_key_names: Set[str] = set() + whole_row_group_aliases: List[str] = [] for item in non_aggregate_items: output_name = item.alias or item.expression.text @@ -6756,13 +6811,22 @@ def _lower_general_row_projection( if item.expression.text in alias_targets: alias_name = item.expression.text if alias_name != active_match_alias: - raise _unsupported( - "Cypher aggregate whole-row grouping currently supports the active MATCH alias only", - field=query.return_.kind, - value=item.expression.text, - line=item.span.line, - column=item.span.column, - ) + if not binding_row_aliases: + raise _unsupported( + "Cypher aggregate whole-row grouping currently supports the active MATCH alias only", + field=query.return_.kind, + value=item.expression.text, + line=item.span.line, + column=item.span.column, + ) + if alias_name not in binding_row_aliases: + raise _unsupported( + "Cypher aggregate whole-row grouping alias is not available in the active bindings-row scope", + field=query.return_.kind, + value=item.expression.text, + line=item.span.line, + column=item.span.column, + ) hidden_key_name = _fresh_temp_name(temp_names, "__cypher_group_key__") raw_key_expr = _whole_row_group_key_expr( alias_name, @@ -6771,20 +6835,26 @@ def _lower_general_row_projection( line=item.span.line, column=item.span.column, ) - pre_items.append((hidden_key_name, raw_key_expr)) - pre_items.append( - ( - output_name, - _whole_row_group_entity_expr( - alias_name, - alias_targets=alias_targets, - field=query.return_.kind, - line=item.span.line, - column=item.span.column, - ), + if binding_row_aliases: + pre_items.append((hidden_key_name, f"{alias_name}.{raw_key_expr}")) + key_names.append(hidden_key_name) + if alias_name not in whole_row_group_aliases: + whole_row_group_aliases.append(alias_name) + else: + pre_items.append((hidden_key_name, raw_key_expr)) + pre_items.append( + ( + output_name, + _whole_row_group_entity_expr( + alias_name, + alias_targets=alias_targets, + field=query.return_.kind, + line=item.span.line, + column=item.span.column, + ), + ) ) - ) - key_names.extend([hidden_key_name, output_name]) + key_names.extend([hidden_key_name, output_name]) hidden_group_key_names.add(hidden_key_name) available_columns.add(output_name) _add_output_mapping( @@ -6887,10 +6957,12 @@ def _lower_general_row_projection( alias_targets=alias_targets, active_match_alias=active_match_alias, ) + bindings_row_path = bool(binding_row_aliases) + alias_key_prefixes = [f"{alias_name}." for alias_name in whole_row_group_aliases] if whole_row_group_aliases else None if key_names: if len(pre_items) > 0: - row_steps.append(with_(pre_items)) - row_steps.append(group_by(key_names, aggregations)) + row_steps.append(with_(pre_items, extend=bindings_row_path)) + row_steps.append(group_by(key_names, aggregations, key_prefixes=alias_key_prefixes)) else: global_key = _fresh_temp_name(temp_names, "__cypher_group__") row_steps.append(with_([(global_key, 1)] + pre_items)) @@ -6898,49 +6970,87 @@ def _lower_general_row_projection( available_columns = {agg.output_name for agg in aggregate_specs} empty_result_row = _empty_aggregate_row(aggregate_specs) - if post_aggregate_items or hidden_group_key_names: - post_projection_items: List[Tuple[str, Any]] = [] - projected_columns: Set[str] = set() - for item in non_aggregate_items: - output_name = item.alias or item.expression.text - post_projection_items.append((output_name, output_name)) - projected_columns.add(output_name) - for agg_spec in aggregate_specs: - if agg_spec.output_name in projected_columns: - raise _unsupported( - "Duplicate Cypher projection names are not yet supported in local lowering", - field=query.return_.kind, - value=agg_spec.output_name, - line=agg_spec.span_line, - column=agg_spec.span_column, + if bindings_row_path and whole_row_group_aliases: + projection_alias = whole_row_group_aliases[0] + result_projection = ResultProjectionPlan( + alias=projection_alias, + table=cast( + Literal["nodes", "edges"], + _alias_table( + alias_targets[projection_alias], + alias=projection_alias, + line=query.return_.span.line, + column=query.return_.span.column, + semantic_entity_kinds=semantic_entity_kinds, + ), + ), + columns=tuple( + ResultProjectionColumn( + output_name=item.alias or item.expression.text, + kind="whole_row", + source_name=item.expression.text, ) - if agg_spec.output_name.startswith("__cypher_postagg__"): - continue - post_projection_items.append((agg_spec.output_name, agg_spec.output_name)) - projected_columns.add(agg_spec.output_name) - for plan in post_aggregate_items: - if plan.output_name in projected_columns: - raise _unsupported( - "Duplicate Cypher projection names are not yet supported in local lowering", - field=query.return_.kind, - value=plan.output_name, - line=plan.span_line, - column=plan.span_column, + for item in non_aggregate_items + if item.expression.text in alias_targets + ) + + tuple( + ResultProjectionColumn( + output_name=agg.output_name, + kind="expr", + source_name=agg.output_name, ) - post_projection_items.append( - ( - plan.output_name, - _row_expr_arg( - plan.expr, - params=params, - alias_targets={}, + for agg in aggregate_specs + if not agg.output_name.startswith("__cypher_postagg__") + ), + ) + + if post_aggregate_items or hidden_group_key_names: + if bindings_row_path and whole_row_group_aliases: + row_steps.append(drop_cols(list(hidden_group_key_names))) + available_columns = set(aggregate.output_name for aggregate in aggregate_specs) + else: + post_projection_items: List[Tuple[str, Any]] = [] + projected_columns: Set[str] = set() + for item in non_aggregate_items: + output_name = item.alias or item.expression.text + post_projection_items.append((output_name, output_name)) + projected_columns.add(output_name) + for agg_spec in aggregate_specs: + if agg_spec.output_name in projected_columns: + raise _unsupported( + "Duplicate Cypher projection names are not yet supported in local lowering", field=query.return_.kind, - ), + value=agg_spec.output_name, + line=agg_spec.span_line, + column=agg_spec.span_column, + ) + if agg_spec.output_name.startswith("__cypher_postagg__"): + continue + post_projection_items.append((agg_spec.output_name, agg_spec.output_name)) + projected_columns.add(agg_spec.output_name) + for plan in post_aggregate_items: + if plan.output_name in projected_columns: + raise _unsupported( + "Duplicate Cypher projection names are not yet supported in local lowering", + field=query.return_.kind, + value=plan.output_name, + line=plan.span_line, + column=plan.span_column, + ) + post_projection_items.append( + ( + plan.output_name, + _row_expr_arg( + plan.expr, + params=params, + alias_targets={}, + field=query.return_.kind, + ), + ) ) - ) - projected_columns.add(plan.output_name) - row_steps.append(projection_fn(post_projection_items)) - available_columns = projected_columns + projected_columns.add(plan.output_name) + row_steps.append(projection_fn(post_projection_items)) + available_columns = projected_columns elif not key_names: row_steps.append(projection_fn([(agg.output_name, agg.output_name) for agg in aggregate_specs])) else: @@ -7058,7 +7168,10 @@ def bind(self) -> "_EmptyRowGraph": Chain(exec_steps, where=lowered.where), seed_rows=seed_rows, post_processing=_normalize_post_processing( - CompiledCypherPostProcessing(empty_result_row=empty_result_row) + CompiledCypherPostProcessing( + result_projection=result_projection, + empty_result_row=empty_result_row, + ) ), ) diff --git a/graphistry/compute/gfql/cypher/parser.py b/graphistry/compute/gfql/cypher/parser.py index 79337d2d3e..d73560769c 100644 --- a/graphistry/compute/gfql/cypher/parser.py +++ b/graphistry/compute/gfql/cypher/parser.py @@ -1610,12 +1610,16 @@ def query_body(self, meta: Any, items: Sequence[Any]) -> CypherQuery: elif idx != len(stages) - 1: with_stages.append(stage) if reentry_match_clauses: + too_many_suffix_withs = ( + len(reentry_match_clauses) > 1 + and len(with_stages) > len(reentry_match_clauses) + 1 + ) if ( stages[0].clause.kind != "with" or stages[-1].clause.kind != "return" or any(stage.clause.kind != "with" for stage in stages[:-1]) or len(with_stages) < len(reentry_match_clauses) - or len(with_stages) > len(reentry_match_clauses) + 1 + or too_many_suffix_withs ): first_match = reentry_match_clauses[0] raise _to_syntax_error( diff --git a/graphistry/compute/gfql/cypher/reentry/compiletime.py b/graphistry/compute/gfql/cypher/reentry/compiletime.py index fd2abef49a..8f58b845d4 100644 --- a/graphistry/compute/gfql/cypher/reentry/compiletime.py +++ b/graphistry/compute/gfql/cypher/reentry/compiletime.py @@ -243,10 +243,12 @@ def _compile_bounded_reentry_query( span=first_unwind.span, ) query = rewritten_query - if not query.reentry_matches or len(query.with_stages) not in { - len(query.reentry_matches), - len(query.reentry_matches) + 1, - }: + too_few_withs = len(query.with_stages) < len(query.reentry_matches) + too_many_suffix_withs = ( + len(query.reentry_matches) > 1 + and len(query.with_stages) > len(query.reentry_matches) + 1 + ) + if not query.reentry_matches or too_few_withs or too_many_suffix_withs: raise _unsupported_at_span( "Cypher MATCH after WITH is only supported for alternating MATCH ... WITH ... MATCH ... [WITH ... MATCH ...] ... [WITH] RETURN read shapes in the local compiler", field="match", diff --git a/graphistry/compute/gfql/cypher/reentry/rewrite.py b/graphistry/compute/gfql/cypher/reentry/rewrite.py index 95398ef320..45e7869d5e 100644 --- a/graphistry/compute/gfql/cypher/reentry/rewrite.py +++ b/graphistry/compute/gfql/cypher/reentry/rewrite.py @@ -61,6 +61,23 @@ def _rewrite_reentry_expr_to_hidden_properties( has_non_source = bool(non_source_carried_props) if not carried_columns and not has_non_source: return expr + needs_scalar_rewrite = any( + re.search(rf"(? Tuple[bool, Any]: alias_names.append(extra_arg.value) else: return False, None - source_alias = alias_names[0] - if source_alias not in table_df.columns: + source_alias_name = alias_names[0] + source_alias = source_alias_name + source_table_df = table_df + entity_id = getattr(self, "_node" if fn == "__node_keys__" else "_edge", None) + prefixed_id_col = f"{source_alias_name}.{entity_id}" if entity_id else None + if prefixed_id_col is not None and prefixed_id_col in table_df.columns: + prefix = f"{source_alias_name}." + alias_cols = [ + col for col in table_df.columns + if isinstance(col, str) and col.startswith(prefix) + ] + source_table_df = table_df[alias_cols].copy().rename( + columns={col: col[len(prefix):] for col in alias_cols} + ) + source_alias = str(entity_id) + elif source_alias not in table_df.columns: return False, None out = entity_keys_series( - table_df, + source_table_df, alias_col=source_alias, table=("nodes" if fn == "__node_keys__" else "edges"), excluded=tuple(alias_names), ) - null_mask = self._gfql_null_mask(table_df, table_df[source_alias]) + null_mask = self._gfql_null_mask(source_table_df, source_table_df[source_alias]) if hasattr(out, "where"): out = self._gfql_mask_fill(out, null_mask, None) return True, out @@ -1044,22 +1058,32 @@ def _gfql_eval_expr_ast(self, table_df: Any, node: Any) -> Tuple[bool, Any]: entity_alias_names.append(extra_arg.value) else: return False, None - source_alias = entity_alias_names[0] - if source_alias not in table_df.columns: - # On bindings-row tables, resolve alias to alias.{node_id} (#880) - node_id = getattr(self, "_node", None) - id_col = f"{source_alias}.{node_id}" if node_id else None - if id_col is not None and id_col in table_df.columns: - source_alias = id_col - else: - return False, None + source_alias_name = entity_alias_names[0] + source_alias = source_alias_name + source_table_df = table_df + # On bindings-row tables, prefer alias.{id} even when a same-name + # marker column exists for the alias. + entity_id = getattr(self, "_node" if fn == "__node_entity__" else "_edge", None) + id_col = f"{source_alias_name}.{entity_id}" if entity_id else None + if id_col is not None and id_col in table_df.columns: + prefix = f"{source_alias_name}." + alias_cols = [ + col for col in table_df.columns + if isinstance(col, str) and col.startswith(prefix) + ] + source_table_df = table_df[alias_cols].copy().rename( + columns={col: col[len(prefix):] for col in alias_cols} + ) + source_alias = str(entity_id) + elif source_alias not in table_df.columns: + return False, None out = self._gfql_format_entity_series( - table_df, + source_table_df, alias_col=source_alias, table=("nodes" if fn == "__node_entity__" else "edges"), excluded=tuple(entity_alias_names), ) - null_mask = self._gfql_null_mask(table_df, table_df[source_alias]) + null_mask = self._gfql_null_mask(source_table_df, source_table_df[source_alias]) if hasattr(out, "where"): out = self._gfql_mask_fill(out, null_mask, None) return True, out diff --git a/graphistry/tests/compute/gfql/cypher/test_lowering.py b/graphistry/tests/compute/gfql/cypher/test_lowering.py index de838bd8ca..01b9a0ccb7 100644 --- a/graphistry/tests/compute/gfql/cypher/test_lowering.py +++ b/graphistry/tests/compute/gfql/cypher/test_lowering.py @@ -12674,6 +12674,35 @@ def test_string_cypher_rejects_obviously_non_boolean_operands_in_boolean_ops(que # ── Multi-alias WITH projection from connected MATCH (#880 / IC-4 shape) ── +def _mk_ic3_cross_country_shape_graph() -> _CypherTestGraph: + """Graph for IC-3-style carried row + collected city list reentry tests.""" + return _mk_graph( + pd.DataFrame( + { + "id": ["p1", "cityA", "friend1", "friend2", "friend3", "cityB", "cityC"], + "label__Person": [True, False, True, True, True, False, False], + "label__City": [False, True, False, False, False, True, True], + "name": ["", "CityA", "", "", "", "CityB", "CityC"], + } + ), + pd.DataFrame( + { + "s": ["p1", "p1", "p1", "p1", "friend1", "friend2", "friend3"], + "d": ["cityA", "friend1", "friend2", "friend3", "cityB", "cityA", "cityC"], + "type": [ + "IS_LOCATED_IN", + "KNOWS", + "KNOWS", + "KNOWS", + "IS_LOCATED_IN", + "IS_LOCATED_IN", + "IS_LOCATED_IN", + ], + } + ), + ) + + def _mk_ic4_shape_graph() -> _CypherTestGraph: """Graph for IC-4 multi-alias WITH tests: person-KNOWS-friend, post-HAS_CREATOR->friend, post-HAS_TAG->tag.""" return _mk_graph( @@ -13285,6 +13314,51 @@ def test_issue_1413_ic4_new_topics_multiple_chained_case_flags() -> None: ] +def test_issue_1413_ic3_cross_country_carried_row_collect_list_reentry_case_sum() -> None: + graph = _mk_ic3_cross_country_shape_graph() + result = graph.gfql( + "MATCH (person:Person {id: $personId})-[:IS_LOCATED_IN]->(city:City) " + "WITH person, collect(city) AS cities " + "MATCH (person)-[:KNOWS]-(friend:Person)-[:IS_LOCATED_IN]->(friendCity:City) " + "WHERE NOT person = friend AND NOT friendCity IN cities " + "WITH friend, " + "CASE WHEN friendCity.name = $countryXName THEN 1 ELSE 0 END AS messageX, " + "CASE WHEN friendCity.name = $countryYName THEN 1 ELSE 0 END AS messageY " + "WITH friend, sum(messageX) AS xCount, sum(messageY) AS yCount " + "RETURN friend.id AS friendId, xCount, yCount " + "ORDER BY friendId ASC", + params={"personId": "p1", "countryXName": "CityB", "countryYName": "CityC"}, + ) + + assert result._nodes.to_dict(orient="records") == [ + {"friendId": "friend1", "xCount": 1, "yCount": 0}, + {"friendId": "friend3", "xCount": 0, "yCount": 1}, + ] + + +def test_issue_1413_ic3_collect_distinct_entity_membership_with_post_aggregate_where() -> None: + graph = _mk_ic3_cross_country_shape_graph() + result = graph.gfql( + "MATCH (person:Person {id: $personId})-[:IS_LOCATED_IN]->(city:City) " + "WITH person, collect(DISTINCT city) AS cities " + "MATCH (person)-[:KNOWS]-(friend:Person)-[:IS_LOCATED_IN]->(friendCity:City) " + "WHERE NOT (friendCity IN cities) " + "WITH friend, " + "CASE WHEN friendCity.name = $countryXName THEN 1 ELSE 0 END AS messageX, " + "CASE WHEN friendCity.name = $countryYName THEN 1 ELSE 0 END AS messageY " + "WITH friend, sum(messageX) AS xCount, sum(messageY) AS yCount " + "WHERE xCount > 0 OR yCount > 0 " + "RETURN friend.id AS friendId, xCount, yCount " + "ORDER BY yCount DESC, friendId ASC", + params={"personId": "p1", "countryXName": "CityB", "countryYName": "CityC"}, + ) + + assert result._nodes.to_dict(orient="records") == [ + {"friendId": "friend3", "xCount": 0, "yCount": 1}, + {"friendId": "friend1", "xCount": 1, "yCount": 0}, + ] + + def test_issue_1038_ic4_return_side_case_expression_regression_lock() -> None: """Regression lock for #1038: RETURN-side CASE over IC4-shaped post timestamp ranges.""" graph = _mk_graph( From f111dac44f6d598c353c060b853d1c8ec9ec3444 Mon Sep 17 00:00:00 2001 From: Leo Meyerovich Date: Mon, 11 May 2026 01:04:08 -0700 Subject: [PATCH 2/2] Fix optional aggregate TCK boundary --- graphistry/compute/gfql/cypher/lowering.py | 2 +- graphistry/tests/compute/gfql/cypher/test_lowering.py | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/graphistry/compute/gfql/cypher/lowering.py b/graphistry/compute/gfql/cypher/lowering.py index 91c0b963f8..6da808dda6 100644 --- a/graphistry/compute/gfql/cypher/lowering.py +++ b/graphistry/compute/gfql/cypher/lowering.py @@ -6659,7 +6659,7 @@ def _lower_general_row_projection( break allow_whole_row_binding_grouping = ( bool(whole_row_group_alias_refs) - and can_force_bindings + and can_force_bindings and not any(clause.optional for clause in query.matches) and bool(aggregate_alias_refs) and aggregate_alias_refs <= set(alias_targets.keys()) and all(isinstance(alias_targets.get(alias_name), ASTNode) for alias_name in whole_row_group_alias_refs) diff --git a/graphistry/tests/compute/gfql/cypher/test_lowering.py b/graphistry/tests/compute/gfql/cypher/test_lowering.py index 01b9a0ccb7..1e6cdcca7f 100644 --- a/graphistry/tests/compute/gfql/cypher/test_lowering.py +++ b/graphistry/tests/compute/gfql/cypher/test_lowering.py @@ -3520,6 +3520,13 @@ def test_string_cypher_failfast_relationship_whole_row_grouped_count_star_bounda graph.gfql("MATCH (a:L)-[rel]->(b) RETURN a, count(*)") +def test_string_cypher_failfast_optional_match_collect_null_whole_row_return_boundary() -> None: + graph = _mk_graph(pd.DataFrame({"id": ["n1"]}), pd.DataFrame({"s": [], "d": []})) + + with pytest.raises(GFQLValidationError, match="one MATCH source alias"): + graph.gfql("MATCH (n) OPTIONAL MATCH (n)-[:NOT_EXIST]->(x) RETURN n, collect(x)") + + def test_string_cypher_supports_optional_match_collect_alias_property() -> None: graph = _mk_graph( pd.DataFrame(