From 7beaf33aef5f700ccbd04c887234372b9f59d578 Mon Sep 17 00:00:00 2001 From: rob Date: Mon, 20 Apr 2026 09:39:21 +0100 Subject: [PATCH 1/5] Index OR --- .../lib/electric/shapes/filter/index.ex | 32 --- .../electric/shapes/filter/where_condition.ex | 196 ++++++++++-------- .../test/electric/shapes/filter_test.exs | 120 ++++++++++- 3 files changed, 223 insertions(+), 125 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/filter/index.ex b/packages/sync-service/lib/electric/shapes/filter/index.ex index 001230feba..d28af6bff0 100644 --- a/packages/sync-service/lib/electric/shapes/filter/index.ex +++ b/packages/sync-service/lib/electric/shapes/filter/index.ex @@ -16,42 +16,10 @@ defmodule Electric.Shapes.Filter.Index do defp module_for("@>"), do: InclusionIndex defp module_for("subquery"), do: SubqueryIndex - # "in" delegates to EqualityIndex, registering the shape under each value - def add_shape(%Filter{} = filter, where_cond_id, shape_id, %{operation: "in"} = optimisation) do - eq_base = optimisation |> Map.delete(:values) |> Map.put(:operation, "=") - - for value <- optimisation.values do - EqualityIndex.add_shape(filter, where_cond_id, shape_id, Map.put(eq_base, :value, value)) - end - - :ok - end - def add_shape(%Filter{} = filter, where_cond_id, shape_id, %{operation: op} = optimisation) do module_for(op).add_shape(filter, where_cond_id, shape_id, optimisation) end - def remove_shape( - %Filter{} = filter, - where_cond_id, - shape_id, - %{operation: "in"} = optimisation - ) do - eq_base = optimisation |> Map.delete(:values) |> Map.put(:operation, "=") - - results = - for value <- optimisation.values do - EqualityIndex.remove_shape( - filter, - where_cond_id, - shape_id, - Map.put(eq_base, :value, value) - ) - end - - if :deleted in results, do: :deleted, else: :ok - end - def remove_shape(%Filter{} = filter, where_cond_id, shape_id, %{operation: op} = optimisation) do module_for(op).remove_shape(filter, where_cond_id, shape_id, optimisation) end diff --git a/packages/sync-service/lib/electric/shapes/filter/where_condition.ex b/packages/sync-service/lib/electric/shapes/filter/where_condition.ex index 1238de8516..087ead3620 100644 --- a/packages/sync-service/lib/electric/shapes/filter/where_condition.ex +++ b/packages/sync-service/lib/electric/shapes/filter/where_condition.ex @@ -19,6 +19,7 @@ defmodule Electric.Shapes.Filter.WhereCondition do alias Electric.Replication.Eval.Parser.Func alias Electric.Replication.Eval.Parser.Ref alias Electric.Replication.Eval.Parser.RowExpr + alias Electric.Replication.PostgresInterop.Casting alias Electric.Shapes.Filter alias Electric.Shapes.Filter.Index alias Electric.Shapes.WhereClause @@ -31,24 +32,32 @@ defmodule Electric.Shapes.Filter.WhereCondition do end @doc """ - Returns `true` when the WHERE clause can use the primary equality/inclusion - indexes maintained by the filter. + Returns `true` when the WHERE clause can use the filter's indexed routing + path instead of relying entirely on `other_shapes`. """ @spec indexed_where?(Expr.t() | nil) :: boolean() - def indexed_where?(where_clause), do: optimise_where(where_clause) != :not_optimised + def indexed_where?(where_clause), do: planned_where(where_clause) != :not_optimised def add_shape(%Filter{where_cond_table: table} = filter, condition_id, shape_id, where_clause) do - case optimise_where(where_clause) do + case planned_where(where_clause) do :not_optimised -> - [{_, {index_keys, other_shapes}}] = :ets.lookup(table, condition_id) - other_shapes = Map.put(other_shapes, shape_id, where_clause) - :ets.insert(table, {condition_id, {index_keys, other_shapes}}) + add_shape_to_other_shapes(table, condition_id, shape_id, where_clause) - optimisation -> + {:split_or, left, right} -> + add_shape(filter, condition_id, shape_id, left) + add_shape(filter, condition_id, shape_id, right) + + {:index, optimisation} -> add_shape_to_index(filter, condition_id, shape_id, optimisation) end end + defp add_shape_to_other_shapes(table, condition_id, shape_id, where_clause) do + [{_, {index_keys, other_shapes}}] = :ets.lookup(table, condition_id) + other_shapes = Map.put(other_shapes, shape_id, where_clause) + :ets.insert(table, {condition_id, {index_keys, other_shapes}}) + end + defp add_shape_to_index( %Filter{where_cond_table: table} = filter, condition_id, @@ -64,23 +73,43 @@ defmodule Electric.Shapes.Filter.WhereCondition do end @doc false - defp optimise_where(%Expr{eval: eval}), do: optimise_where(eval) + defp planned_where(nil), do: :not_optimised + + defp planned_where(%Expr{eval: eval}), do: planned_where(eval) + + defp planned_where(%Func{name: "or", args: [left, right]}) do + if indexed_eval?(left) and indexed_eval?(right) do + {:split_or, where_expr(left), where_expr(right)} + else + :not_optimised + end + end + + defp planned_where(eval) do + case extract_index_optimisation(eval) do + {:ok, optimisation, residual} -> + {:index, %{optimisation | and_where: maybe_where_expr(residual)}} + + :error -> + :not_optimised + end + end - defp optimise_where(%Func{ + defp direct_optimisation(%Func{ name: ~s("="), args: [%Ref{path: [field], type: type}, %Const{value: value}] }) do %{operation: "=", field: field, type: type, value: value, and_where: nil} end - defp optimise_where(%Func{ + defp direct_optimisation(%Func{ name: ~s("="), args: [%Const{value: value}, %Ref{path: [field], type: type}] }) do %{operation: "=", field: field, type: type, value: value, and_where: nil} end - defp optimise_where(%Func{ + defp direct_optimisation(%Func{ name: ~s("@>"), args: [%Ref{path: [field], type: type}, %Const{value: value}] }) @@ -88,7 +117,7 @@ defmodule Electric.Shapes.Filter.WhereCondition do %{operation: "@>", field: field, type: type, value: value, and_where: nil} end - defp optimise_where(%Func{ + defp direct_optimisation(%Func{ name: ~s("<@"), args: [%Const{value: value}, %Ref{path: [field], type: type}] }) @@ -97,7 +126,7 @@ defmodule Electric.Shapes.Filter.WhereCondition do end # const = ANY(array_ref) → reuse @> index with [const] as single-element array - defp optimise_where(%Func{ + defp direct_optimisation(%Func{ name: "any", args: [ %Func{ @@ -111,51 +140,70 @@ defmodule Electric.Shapes.Filter.WhereCondition do %{operation: "@>", field: field, type: type, value: [value], and_where: nil} end - defp optimise_where(%Func{name: "sublink_membership_check"} = subquery) do + defp direct_optimisation(%Func{name: "sublink_membership_check"} = subquery) do subquery_optimisation(subquery, :positive) end - defp optimise_where(%Func{ + defp direct_optimisation(%Func{ name: "not", args: [%Func{name: "sublink_membership_check"} = subquery] }) do subquery_optimisation(subquery, :negated) end - # field IN (const1, const2, ...) → reuse = index with multiple values - defp optimise_where(%Func{name: "or"} = expr) do - case flatten_or_equalities(expr) do - {:ok, field, type, values} -> - %{operation: "in", field: field, type: type, values: values, and_where: nil} + defp direct_optimisation(_), do: :not_optimised + + defp extract_index_optimisation(%Expr{eval: eval}), do: extract_index_optimisation(eval) + + defp extract_index_optimisation(%Func{name: "and", args: [left, right]}) do + case extract_index_optimisation(left) do + {:ok, optimisation, residual} -> + {:ok, optimisation, and_eval(residual, right)} :error -> - :not_optimised + case extract_index_optimisation(right) do + {:ok, optimisation, residual} -> + {:ok, optimisation, and_eval(left, residual)} + + :error -> + :error + end end end - defp optimise_where(%Func{name: "and", args: [arg1, arg2]}) do - case {optimise_where(arg1), optimise_where(arg2)} do - {%{operation: op, and_where: nil} = params, _} when op in ["=", "@>", "in", "subquery"] -> - %{params | and_where: where_expr(arg2)} - - {_, %{operation: op, and_where: nil} = params} when op in ["=", "@>", "in", "subquery"] -> - %{params | and_where: where_expr(arg1)} - - _ -> - :not_optimised + defp extract_index_optimisation(eval) do + case direct_optimisation(eval) do + :not_optimised -> :error + optimisation -> {:ok, optimisation, nil} end end - defp optimise_where(_), do: :not_optimised + defp indexed_eval?(eval), do: planned_where(eval) != :not_optimised - # "in" shares the EqualityIndex with "=", so use the same index key - defp index_key("in"), do: "=" defp index_key(op), do: op + defp maybe_where_expr(nil), do: nil + defp maybe_where_expr(eval), do: where_expr(eval) + defp where_expr(eval) do %Expr{eval: eval, used_refs: Parser.find_refs(eval), returns: :bool} end + defp and_eval(nil, nil), do: nil + defp and_eval(nil, eval), do: eval + defp and_eval(eval, nil), do: eval + + defp and_eval(left, right) do + %Func{ + args: [left, right], + type: :bool, + implementation: &Casting.pg_and/2, + name: "and", + strict?: false, + location: min(Map.get(left, :location, 0), Map.get(right, :location, 0)) + } + end + defp subquery_optimisation( %Func{name: "sublink_membership_check", args: [testexpr, %Ref{path: subquery_ref}]} = _subquery, @@ -177,44 +225,6 @@ defmodule Electric.Shapes.Filter.WhereCondition do end end - # Flatten an OR chain of equalities on the same field into {field, type, [values]} - defp flatten_or_equalities(expr) do - case collect_or_equalities(expr, []) do - {:ok, [{field, type, _} | _] = pairs} -> - if Enum.all?(pairs, fn {f, t, _} -> f == field and t == type end) do - values = Enum.map(pairs, fn {_, _, v} -> v end) - {:ok, field, type, values} - else - :error - end - - _ -> - :error - end - end - - defp collect_or_equalities(%Func{name: "or", args: [left, right]}, acc) do - with {:ok, acc} <- collect_or_equalities(left, acc) do - collect_or_equalities(right, acc) - end - end - - defp collect_or_equalities( - %Func{name: ~s("="), args: [%Ref{path: [field], type: type}, %Const{value: value}]}, - acc - ) do - {:ok, [{field, type, value} | acc]} - end - - defp collect_or_equalities( - %Func{name: ~s("="), args: [%Const{value: value}, %Ref{path: [field], type: type}]}, - acc - ) do - {:ok, [{field, type, value} | acc]} - end - - defp collect_or_equalities(_, _acc), do: :error - @doc """ Remove a shape from a WhereCondition. @@ -228,19 +238,29 @@ defmodule Electric.Shapes.Filter.WhereCondition do shape_id, where_clause ) do - case optimise_where(where_clause) do + case planned_where(where_clause) do :not_optimised -> remove_shape_from_other_shapes(table, condition_id, shape_id) - optimisation -> + {:split_or, left, right} -> + _ = remove_shape(filter, condition_id, shape_id, left) + _ = remove_shape(filter, condition_id, shape_id, right) + condition_status(table, condition_id) + + {:index, optimisation} -> remove_shape_from_index(filter, condition_id, shape_id, optimisation) end end defp remove_shape_from_other_shapes(table, condition_id, shape_id) do - [{_, {index_keys, other_shapes}}] = :ets.lookup(table, condition_id) - other_shapes = Map.delete(other_shapes, shape_id) - update_or_delete_condition(table, condition_id, index_keys, other_shapes) + case :ets.lookup(table, condition_id) do + [] -> + :deleted + + [{_, {index_keys, other_shapes}}] -> + other_shapes = Map.delete(other_shapes, shape_id) + update_or_delete_condition(table, condition_id, index_keys, other_shapes) + end end defp remove_shape_from_index( @@ -251,16 +271,28 @@ defmodule Electric.Shapes.Filter.WhereCondition do ) do case Index.remove_shape(filter, condition_id, shape_id, optimisation) do :deleted -> - [{_, {index_keys, other_shapes}}] = :ets.lookup(table, condition_id) - key = {optimisation.field, index_key(optimisation.operation)} - index_keys = MapSet.delete(index_keys, key) - update_or_delete_condition(table, condition_id, index_keys, other_shapes) + case :ets.lookup(table, condition_id) do + [] -> + :deleted + + [{_, {index_keys, other_shapes}}] -> + key = {optimisation.field, index_key(optimisation.operation)} + index_keys = MapSet.delete(index_keys, key) + update_or_delete_condition(table, condition_id, index_keys, other_shapes) + end :ok -> :ok end end + defp condition_status(table, condition_id) do + case :ets.lookup(table, condition_id) do + [] -> :deleted + [_] -> :ok + end + end + defp update_or_delete_condition(table, condition_id, index_keys, other_shapes) when index_keys == %MapSet{} and other_shapes == %{} do :ets.delete(table, condition_id) diff --git a/packages/sync-service/test/electric/shapes/filter_test.exs b/packages/sync-service/test/electric/shapes/filter_test.exs index 1cd9f48519..646acb24ed 100644 --- a/packages/sync-service/test/electric/shapes/filter_test.exs +++ b/packages/sync-service/test/electric/shapes/filter_test.exs @@ -29,6 +29,18 @@ defmodule Electric.Shapes.FilterTest do assert Filter.indexed_shape?(shape) end + test "returns true for OR shapes when both branches are indexed" do + shape = Shape.new!("t1", where: "id = 7 OR id = 8", inspector: @inspector) + + assert Filter.indexed_shape?(shape) + end + + test "returns false for OR shapes when one branch is not indexed" do + shape = Shape.new!("t1", where: "id = 7 OR number > 5", inspector: @inspector) + + refute Filter.indexed_shape?(shape) + end + test "returns false for shapes without an indexable where clause" do shape = Shape.new!("t1", inspector: @inspector) @@ -460,6 +472,32 @@ defmodule Electric.Shapes.FilterTest do {%{"id" => "1", "number" => "3"}, false}, {%{"id" => "3", "number" => "6"}, false} ] + }, + %{ + where: "id = 1 OR id = 2", + records: [ + {%{"id" => "1"}, true}, + {%{"id" => "2"}, true}, + {%{"id" => "3"}, false} + ] + }, + %{ + where: "id = 1 AND (number = 2 OR number = 3)", + records: [ + {%{"id" => "1", "number" => "2"}, true}, + {%{"id" => "1", "number" => "3"}, true}, + {%{"id" => "1", "number" => "4"}, false}, + {%{"id" => "2", "number" => "2"}, false} + ] + }, + %{ + where: "(id = 1 OR id = 2) AND number = 3", + records: [ + {%{"id" => "1", "number" => "3"}, true}, + {%{"id" => "2", "number" => "3"}, true}, + {%{"id" => "1", "number" => "4"}, false}, + {%{"id" => "3", "number" => "3"}, false} + ] } ] @@ -498,9 +536,23 @@ defmodule Electric.Shapes.FilterTest do Shape.new!("table", where: "1 = ANY(an_array)", inspector: @inspector), Shape.new!("table", where: "2 = ANY(an_array)", inspector: @inspector), Shape.new!("table", where: "id = 1 AND 1 = ANY(an_array)", inspector: @inspector), + Shape.new!("table", where: "id = 1 OR id = 2", inspector: @inspector), + Shape.new!("table", where: "id = 1 OR number > 5", inspector: @inspector), + Shape.new!("table", where: "id = 1 AND (number = 2 OR number = 3)", inspector: @inspector), + Shape.new!("table", where: "(id = 1 OR id = 2) AND number = 3", inspector: @inspector), Shape.new!("table", where: "id IN (1, 2, 3)", inspector: @inspector), Shape.new!("table", where: "id IN (4, 5)", inspector: @inspector), Shape.new!("table", where: "id IN (1, 2) AND number > 5", inspector: @inspector), + Shape.new!("table", + where: "id IN (SELECT id FROM another_table) OR id = 1", + inspector: @inspector, + feature_flags: ["allow_subqueries"] + ), + Shape.new!("table", + where: "id IN (SELECT id FROM another_table) OR number > 5", + inspector: @inspector, + feature_flags: ["allow_subqueries"] + ), Shape.new!("table", where: "id IN (SELECT id FROM another_table)", inspector: @inspector, @@ -765,7 +817,7 @@ defmodule Electric.Shapes.FilterTest do test "where clause in the form `const = ANY(array_field)` is optimised" do # Same shape count as @> but higher budget per shape because the ANY - # AST is deeper to pattern-match through optimise_where + # AST is deeper to pattern-match through the planner shape_count = @shape_count * 5 max_reductions = @max_reductions * 10 @@ -790,15 +842,16 @@ defmodule Electric.Shapes.FilterTest do end) end - test "where clause in the form `field IN (const1, const2, ...)` is optimised" do + test "where clause in the form `field = const1 OR field = const2` is optimised" do + max_reductions = @max_reductions * 2 filter = Filter.new() Enum.each(1..@shape_count, fn i -> shape = - Shape.new!("t1", where: "id IN (#{i}, #{i + @shape_count})", inspector: @inspector) + Shape.new!("t1", where: "id = #{i} OR id = #{i + @shape_count}", inspector: @inspector) add_reductions = reductions(fn -> Filter.add_shape(filter, i, shape) end) - assert add_reductions < @max_reductions + assert add_reductions < max_reductions end) change = change("t1", %{"id" => "7"}) @@ -806,11 +859,11 @@ defmodule Electric.Shapes.FilterTest do affected_reductions = reductions(fn -> Filter.affected_shapes(filter, change) end) - assert affected_reductions < @max_reductions + assert affected_reductions < max_reductions Enum.each(1..@shape_count, fn i -> remove_reductions = reductions(fn -> Filter.remove_shape(filter, i) end) - assert remove_reductions < @max_reductions + assert remove_reductions < max_reductions end) end @@ -1007,13 +1060,11 @@ defmodule Electric.Shapes.FilterTest do "CREATE TABLE IF NOT EXISTS or_parent (id INT PRIMARY KEY)", "CREATE TABLE IF NOT EXISTS or_child (id INT PRIMARY KEY, par_id INT REFERENCES or_parent(id), value TEXT NOT NULL)" ] - test "non-optimisable OR+subquery shape is affected by root table changes", %{ + test "optimisable OR+subquery shape is affected by root table changes", %{ inspector: inspector } do - # Shape with OR combining a subquery and a simple condition. - # OR is not optimisable, so the shape lands in other_shapes AND - # gets registered in the sublink inverted index. Root table changes - # must still be routed to this shape once seeded. + # Both OR branches are indexable, so the shape is registered twice in the + # filter tree: once for the subquery branch and once for the simple branch. {:ok, shape} = Shape.new("or_child", inspector: inspector, @@ -1067,6 +1118,53 @@ defmodule Electric.Shapes.FilterTest do assert Filter.affected_shapes(filter, update_into_shape) == MapSet.new(["shape1"]) end + @tag with_sql: [ + "CREATE TABLE IF NOT EXISTS or_parent (id INT PRIMARY KEY)", + "CREATE TABLE IF NOT EXISTS or_child (id INT PRIMARY KEY, par_id INT REFERENCES or_parent(id), value TEXT NOT NULL)" + ] + test "mixed OR+subquery shape falls back to other_shapes verification", %{ + inspector: inspector + } do + {:ok, shape} = + Shape.new("or_child", + inspector: inspector, + where: "par_id IN (SELECT id FROM or_parent) OR value LIKE 'target%'" + ) + + filter = Filter.new() + filter = Filter.add_shape(filter, "shape1", shape) + + index = Filter.subquery_index(filter) + subquery_ref = ["$sublink", "0"] + + for value <- [1, 2, 3] do + SubqueryIndex.add_value(index, "shape1", subquery_ref, 0, value) + end + + SubqueryIndex.mark_ready(index, "shape1") + + insert_matching_value = %NewRecord{ + relation: {"public", "or_child"}, + record: %{"id" => "99", "par_id" => "99", "value" => "target-me"} + } + + assert Filter.affected_shapes(filter, insert_matching_value) == MapSet.new(["shape1"]) + + insert_matching_subquery = %NewRecord{ + relation: {"public", "or_child"}, + record: %{"id" => "10", "par_id" => "2", "value" => "other"} + } + + assert Filter.affected_shapes(filter, insert_matching_subquery) == MapSet.new(["shape1"]) + + insert_no_match = %NewRecord{ + relation: {"public", "or_child"}, + record: %{"id" => "50", "par_id" => "99", "value" => "other"} + } + + assert Filter.affected_shapes(filter, insert_no_match) == MapSet.new([]) + end + @tag with_sql: [ "CREATE TABLE IF NOT EXISTS like_parent_unseeded (id INT PRIMARY KEY)", "CREATE TABLE IF NOT EXISTS like_child_unseeded (id INT PRIMARY KEY, name TEXT NOT NULL, parent_id INT REFERENCES like_parent_unseeded(id))" From 93294b38203a7a44f8cf19d26fb5556d7731bb70 Mon Sep 17 00:00:00 2001 From: rob Date: Mon, 20 Apr 2026 09:46:14 +0100 Subject: [PATCH 2/5] Add changeset for OR indexing --- .changeset/stale-pears-dance.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/stale-pears-dance.md diff --git a/.changeset/stale-pears-dance.md b/.changeset/stale-pears-dance.md new file mode 100644 index 0000000000..beb3f77709 --- /dev/null +++ b/.changeset/stale-pears-dance.md @@ -0,0 +1,5 @@ +--- +'@core/sync-service': patch +--- + +Optimise OR routing in sync-service filters by indexing OR branches when both sides are indexable, removing the dedicated IN special case, and falling back to `other_shapes` when an OR branch is not optimisable. From 20301fb752bb8b8fce04d4fc2c76a9ced5bc8c06 Mon Sep 17 00:00:00 2001 From: rob Date: Mon, 20 Apr 2026 10:29:51 +0100 Subject: [PATCH 3/5] Refactor OR planning around optimise_where --- .../electric/shapes/filter/where_condition.ex | 110 ++++++++---------- 1 file changed, 47 insertions(+), 63 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/filter/where_condition.ex b/packages/sync-service/lib/electric/shapes/filter/where_condition.ex index 087ead3620..db57219dde 100644 --- a/packages/sync-service/lib/electric/shapes/filter/where_condition.ex +++ b/packages/sync-service/lib/electric/shapes/filter/where_condition.ex @@ -36,18 +36,18 @@ defmodule Electric.Shapes.Filter.WhereCondition do path instead of relying entirely on `other_shapes`. """ @spec indexed_where?(Expr.t() | nil) :: boolean() - def indexed_where?(where_clause), do: planned_where(where_clause) != :not_optimised + def indexed_where?(where_clause), do: optimise_where(where_clause) != :not_optimised def add_shape(%Filter{where_cond_table: table} = filter, condition_id, shape_id, where_clause) do - case planned_where(where_clause) do + case optimise_where(where_clause) do :not_optimised -> add_shape_to_other_shapes(table, condition_id, shape_id, where_clause) - {:split_or, left, right} -> + {:or, left, right} -> add_shape(filter, condition_id, shape_id, left) add_shape(filter, condition_id, shape_id, right) - {:index, optimisation} -> + %{operation: _} = optimisation -> add_shape_to_index(filter, condition_id, shape_id, optimisation) end end @@ -73,43 +73,46 @@ defmodule Electric.Shapes.Filter.WhereCondition do end @doc false - defp planned_where(nil), do: :not_optimised + defp optimise_where(nil), do: :not_optimised - defp planned_where(%Expr{eval: eval}), do: planned_where(eval) + defp optimise_where(%Expr{eval: eval}), do: optimise_where(eval) - defp planned_where(%Func{name: "or", args: [left, right]}) do - if indexed_eval?(left) and indexed_eval?(right) do - {:split_or, where_expr(left), where_expr(right)} + defp optimise_where(%Func{name: "or", args: [left, right]}) do + if optimise_where(left) != :not_optimised and optimise_where(right) != :not_optimised do + {:or, where_expr(left), where_expr(right)} else :not_optimised end end - defp planned_where(eval) do - case extract_index_optimisation(eval) do - {:ok, optimisation, residual} -> - {:index, %{optimisation | and_where: maybe_where_expr(residual)}} + defp optimise_where(%Func{name: "and", args: [left, right]}) do + case {optimise_where(left), optimise_where(right)} do + {%{operation: _} = optimisation, _} -> + %{optimisation | and_where: merge_and_where(optimisation.and_where, right)} - :error -> + {_, %{operation: _} = optimisation} -> + %{optimisation | and_where: merge_and_where(left, optimisation.and_where)} + + _ -> :not_optimised end end - defp direct_optimisation(%Func{ + defp optimise_where(%Func{ name: ~s("="), args: [%Ref{path: [field], type: type}, %Const{value: value}] }) do %{operation: "=", field: field, type: type, value: value, and_where: nil} end - defp direct_optimisation(%Func{ + defp optimise_where(%Func{ name: ~s("="), args: [%Const{value: value}, %Ref{path: [field], type: type}] }) do %{operation: "=", field: field, type: type, value: value, and_where: nil} end - defp direct_optimisation(%Func{ + defp optimise_where(%Func{ name: ~s("@>"), args: [%Ref{path: [field], type: type}, %Const{value: value}] }) @@ -117,7 +120,7 @@ defmodule Electric.Shapes.Filter.WhereCondition do %{operation: "@>", field: field, type: type, value: value, and_where: nil} end - defp direct_optimisation(%Func{ + defp optimise_where(%Func{ name: ~s("<@"), args: [%Const{value: value}, %Ref{path: [field], type: type}] }) @@ -126,7 +129,7 @@ defmodule Electric.Shapes.Filter.WhereCondition do end # const = ANY(array_ref) → reuse @> index with [const] as single-element array - defp direct_optimisation(%Func{ + defp optimise_where(%Func{ name: "any", args: [ %Func{ @@ -140,70 +143,51 @@ defmodule Electric.Shapes.Filter.WhereCondition do %{operation: "@>", field: field, type: type, value: [value], and_where: nil} end - defp direct_optimisation(%Func{name: "sublink_membership_check"} = subquery) do + defp optimise_where(%Func{name: "sublink_membership_check"} = subquery) do subquery_optimisation(subquery, :positive) end - defp direct_optimisation(%Func{ + defp optimise_where(%Func{ name: "not", args: [%Func{name: "sublink_membership_check"} = subquery] }) do subquery_optimisation(subquery, :negated) end - defp direct_optimisation(_), do: :not_optimised - - defp extract_index_optimisation(%Expr{eval: eval}), do: extract_index_optimisation(eval) - - defp extract_index_optimisation(%Func{name: "and", args: [left, right]}) do - case extract_index_optimisation(left) do - {:ok, optimisation, residual} -> - {:ok, optimisation, and_eval(residual, right)} - - :error -> - case extract_index_optimisation(right) do - {:ok, optimisation, residual} -> - {:ok, optimisation, and_eval(left, residual)} - - :error -> - :error - end - end - end - - defp extract_index_optimisation(eval) do - case direct_optimisation(eval) do - :not_optimised -> :error - optimisation -> {:ok, optimisation, nil} - end - end - - defp indexed_eval?(eval), do: planned_where(eval) != :not_optimised + defp optimise_where(_), do: :not_optimised defp index_key(op), do: op - defp maybe_where_expr(nil), do: nil - defp maybe_where_expr(eval), do: where_expr(eval) - defp where_expr(eval) do %Expr{eval: eval, used_refs: Parser.find_refs(eval), returns: :bool} end - defp and_eval(nil, nil), do: nil - defp and_eval(nil, eval), do: eval - defp and_eval(eval, nil), do: eval + defp merge_and_where(nil, nil), do: nil + + defp merge_and_where(left, nil), do: normalise_where(left) - defp and_eval(left, right) do - %Func{ - args: [left, right], + defp merge_and_where(nil, right), do: normalise_where(right) + + defp merge_and_where(left, right) do + left_eval = extract_eval(left) + right_eval = extract_eval(right) + + where_expr(%Func{ + args: [left_eval, right_eval], type: :bool, implementation: &Casting.pg_and/2, name: "and", strict?: false, - location: min(Map.get(left, :location, 0), Map.get(right, :location, 0)) - } + location: min(Map.get(left_eval, :location, 0), Map.get(right_eval, :location, 0)) + }) end + defp normalise_where(%Expr{} = expr), do: expr + defp normalise_where(eval), do: where_expr(eval) + + defp extract_eval(%Expr{eval: eval}), do: eval + defp extract_eval(eval), do: eval + defp subquery_optimisation( %Func{name: "sublink_membership_check", args: [testexpr, %Ref{path: subquery_ref}]} = _subquery, @@ -238,16 +222,16 @@ defmodule Electric.Shapes.Filter.WhereCondition do shape_id, where_clause ) do - case planned_where(where_clause) do + case optimise_where(where_clause) do :not_optimised -> remove_shape_from_other_shapes(table, condition_id, shape_id) - {:split_or, left, right} -> + {:or, left, right} -> _ = remove_shape(filter, condition_id, shape_id, left) _ = remove_shape(filter, condition_id, shape_id, right) condition_status(table, condition_id) - {:index, optimisation} -> + %{operation: _} = optimisation -> remove_shape_from_index(filter, condition_id, shape_id, optimisation) end end From 58f70fbc74ac7976f90173da5ec32aa07cb30ce0 Mon Sep 17 00:00:00 2001 From: rob Date: Mon, 20 Apr 2026 10:48:40 +0100 Subject: [PATCH 4/5] Remove redundant filter index key helper --- .../lib/electric/shapes/filter/where_condition.ex | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/filter/where_condition.ex b/packages/sync-service/lib/electric/shapes/filter/where_condition.ex index db57219dde..568df4b039 100644 --- a/packages/sync-service/lib/electric/shapes/filter/where_condition.ex +++ b/packages/sync-service/lib/electric/shapes/filter/where_condition.ex @@ -65,7 +65,7 @@ defmodule Electric.Shapes.Filter.WhereCondition do optimisation ) do [{_, {index_keys, other_shapes}}] = :ets.lookup(table, condition_id) - key = {optimisation.field, index_key(optimisation.operation)} + key = {optimisation.field, optimisation.operation} index_keys = MapSet.put(index_keys, key) :ets.insert(table, {condition_id, {index_keys, other_shapes}}) @@ -156,8 +156,6 @@ defmodule Electric.Shapes.Filter.WhereCondition do defp optimise_where(_), do: :not_optimised - defp index_key(op), do: op - defp where_expr(eval) do %Expr{eval: eval, used_refs: Parser.find_refs(eval), returns: :bool} end @@ -260,7 +258,7 @@ defmodule Electric.Shapes.Filter.WhereCondition do :deleted [{_, {index_keys, other_shapes}}] -> - key = {optimisation.field, index_key(optimisation.operation)} + key = {optimisation.field, optimisation.operation} index_keys = MapSet.delete(index_keys, key) update_or_delete_condition(table, condition_id, index_keys, other_shapes) end From 351b81ee9db558473853985d131e2ef423824490 Mon Sep 17 00:00:00 2001 From: rob Date: Wed, 22 Apr 2026 10:25:44 +0100 Subject: [PATCH 5/5] Fix shared-prefix OR residual routing bug --- .../lib/electric/shapes/filter/index.ex | 20 ++++-- .../shapes/filter/indexes/equality_index.ex | 26 +++++-- .../shapes/filter/indexes/inclusion_index.ex | 28 ++++++-- .../shapes/filter/indexes/subquery_index.ex | 69 +++++++++++++------ .../electric/shapes/filter/where_condition.ex | 61 ++++++++++------ .../shapes/filter/subquery_index_test.exs | 16 ++--- .../shapes/filter/subquery_node_test.exs | 9 ++- .../test/electric/shapes/filter_test.exs | 12 ++++ 8 files changed, 175 insertions(+), 66 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/filter/index.ex b/packages/sync-service/lib/electric/shapes/filter/index.ex index d28af6bff0..3420cd7067 100644 --- a/packages/sync-service/lib/electric/shapes/filter/index.ex +++ b/packages/sync-service/lib/electric/shapes/filter/index.ex @@ -16,12 +16,24 @@ defmodule Electric.Shapes.Filter.Index do defp module_for("@>"), do: InclusionIndex defp module_for("subquery"), do: SubqueryIndex - def add_shape(%Filter{} = filter, where_cond_id, shape_id, %{operation: op} = optimisation) do - module_for(op).add_shape(filter, where_cond_id, shape_id, optimisation) + def add_shape( + %Filter{} = filter, + where_cond_id, + shape_id, + %{operation: op} = optimisation, + branch_key + ) do + module_for(op).add_shape(filter, where_cond_id, shape_id, optimisation, branch_key) end - def remove_shape(%Filter{} = filter, where_cond_id, shape_id, %{operation: op} = optimisation) do - module_for(op).remove_shape(filter, where_cond_id, shape_id, optimisation) + def remove_shape( + %Filter{} = filter, + where_cond_id, + shape_id, + %{operation: op} = optimisation, + branch_key + ) do + module_for(op).remove_shape(filter, where_cond_id, shape_id, optimisation, branch_key) end def affected_shapes(%Filter{} = filter, where_cond_id, field, operation, record) do diff --git a/packages/sync-service/lib/electric/shapes/filter/indexes/equality_index.ex b/packages/sync-service/lib/electric/shapes/filter/indexes/equality_index.ex index 421e375c95..8df4b77067 100644 --- a/packages/sync-service/lib/electric/shapes/filter/indexes/equality_index.ex +++ b/packages/sync-service/lib/electric/shapes/filter/indexes/equality_index.ex @@ -20,7 +20,13 @@ defmodule Electric.Shapes.Filter.Indexes.EqualityIndex do @env Env.new() - def add_shape(%Filter{eq_index_table: table} = filter, condition_id, shape_id, optimisation) do + def add_shape( + %Filter{eq_index_table: table} = filter, + condition_id, + shape_id, + optimisation, + branch_key + ) do %{field: field, type: type, value: value, and_where: and_where} = optimisation key = {condition_id, field, value} @@ -39,10 +45,16 @@ defmodule Electric.Shapes.Filter.Indexes.EqualityIndex do existing_id end - WhereCondition.add_shape(filter, next_condition_id, shape_id, and_where) + WhereCondition.add_shape(filter, next_condition_id, shape_id, and_where, branch_key) end - def remove_shape(%Filter{eq_index_table: table} = filter, condition_id, shape_id, optimisation) do + def remove_shape( + %Filter{eq_index_table: table} = filter, + condition_id, + shape_id, + optimisation, + branch_key + ) do %{field: field, value: value, and_where: and_where} = optimisation key = {condition_id, field, value} @@ -51,7 +63,13 @@ defmodule Electric.Shapes.Filter.Indexes.EqualityIndex do :deleted [{_, {_type, next_condition_id}}] -> - case WhereCondition.remove_shape(filter, next_condition_id, shape_id, and_where) do + case WhereCondition.remove_shape( + filter, + next_condition_id, + shape_id, + and_where, + branch_key + ) do :deleted -> :ets.delete(table, key) diff --git a/packages/sync-service/lib/electric/shapes/filter/indexes/inclusion_index.ex b/packages/sync-service/lib/electric/shapes/filter/indexes/inclusion_index.ex index f6ba03ec8e..0d0710cb14 100644 --- a/packages/sync-service/lib/electric/shapes/filter/indexes/inclusion_index.ex +++ b/packages/sync-service/lib/electric/shapes/filter/indexes/inclusion_index.ex @@ -30,7 +30,13 @@ defmodule Electric.Shapes.Filter.Indexes.InclusionIndex do @env Env.new() - def add_shape(%Filter{incl_index_table: table} = filter, condition_id, shape_id, optimisation) do + def add_shape( + %Filter{incl_index_table: table} = filter, + condition_id, + shape_id, + optimisation, + branch_key + ) do %{field: field, type: type, value: array_value, and_where: and_where} = optimisation :ets.insert(table, {{:type, condition_id, field}, type}) @@ -42,7 +48,8 @@ defmodule Electric.Shapes.Filter.Indexes.InclusionIndex do condition_id: condition_id, field: field, shape_id: shape_id, - and_where: and_where + and_where: and_where, + branch_key: branch_key } add_shape_to_node(ctx, [], values) @@ -92,7 +99,13 @@ defmodule Electric.Shapes.Filter.Indexes.InclusionIndex do existing_id end - WhereCondition.add_shape(ctx.filter, node_condition_id, ctx.shape_id, ctx.and_where) + WhereCondition.add_shape( + ctx.filter, + node_condition_id, + ctx.shape_id, + ctx.and_where, + ctx.branch_key + ) end defp get_or_create_node(table, node_key) do @@ -125,7 +138,8 @@ defmodule Electric.Shapes.Filter.Indexes.InclusionIndex do %Filter{incl_index_table: table} = filter, condition_id, shape_id, - optimisation + optimisation, + branch_key ) do %{field: field, value: array_value, and_where: and_where} = optimisation ordered = array_value |> Enum.sort() |> Enum.dedup() @@ -136,7 +150,8 @@ defmodule Electric.Shapes.Filter.Indexes.InclusionIndex do condition_id: condition_id, field: field, shape_id: shape_id, - and_where: and_where + and_where: and_where, + branch_key: branch_key } remove_shape_from_node(ctx, [], ordered) @@ -198,7 +213,8 @@ defmodule Electric.Shapes.Filter.Indexes.InclusionIndex do ctx.filter, node_condition_id, ctx.shape_id, - ctx.and_where + ctx.and_where, + ctx.branch_key ) do :deleted -> :ets.insert(ctx.table, {node_key, %{node | condition_id: nil}}) diff --git a/packages/sync-service/lib/electric/shapes/filter/indexes/subquery_index.ex b/packages/sync-service/lib/electric/shapes/filter/indexes/subquery_index.ex index a34d29d962..47632a854d 100644 --- a/packages/sync-service/lib/electric/shapes/filter/indexes/subquery_index.ex +++ b/packages/sync-service/lib/electric/shapes/filter/indexes/subquery_index.ex @@ -105,20 +105,33 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndex do @doc """ Register a shape on a concrete subquery filter node. """ - @spec add_shape(Filter.t(), reference(), term(), map()) :: :ok - def add_shape(%Filter{subquery_index: table} = filter, condition_id, shape_id, optimisation) do + @spec add_shape(Filter.t(), reference(), term(), map(), [atom()]) :: :ok + def add_shape( + %Filter{subquery_index: table} = filter, + condition_id, + shape_id, + optimisation, + branch_key + ) do node_id = {condition_id, optimisation.field} next_condition_id = make_ref() WhereCondition.init(filter, next_condition_id) - WhereCondition.add_shape(filter, next_condition_id, shape_id, optimisation.and_where) + + WhereCondition.add_shape( + filter, + next_condition_id, + shape_id, + optimisation.and_where, + branch_key + ) ensure_node_meta(table, node_id, optimisation.testexpr) :ets.insert( table, {{:node_shape, node_id}, - {shape_id, optimisation.dep_index, optimisation.polarity, next_condition_id}} + {shape_id, optimisation.dep_index, optimisation.polarity, next_condition_id, branch_key}} ) if optimisation.polarity == :negated do @@ -128,13 +141,13 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndex do :ets.insert( table, {{:shape_node, shape_id}, - {node_id, optimisation.dep_index, optimisation.polarity, next_condition_id}} + {node_id, optimisation.dep_index, optimisation.polarity, next_condition_id, branch_key}} ) :ets.insert( table, {{:shape_dep_node, shape_id, optimisation.dep_index}, - {node_id, optimisation.polarity, next_condition_id}} + {node_id, optimisation.polarity, next_condition_id, branch_key}} ) :ets.insert(table, {{:node_fallback, node_id}, {shape_id, next_condition_id}}) @@ -144,21 +157,33 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndex do @doc """ Remove a shape from a concrete subquery filter node. """ - @spec remove_shape(Filter.t(), reference(), term(), map()) :: :deleted | :ok - def remove_shape(%Filter{subquery_index: table} = filter, condition_id, shape_id, optimisation) do + @spec remove_shape(Filter.t(), reference(), term(), map(), [atom()]) :: :deleted | :ok + def remove_shape( + %Filter{subquery_index: table} = filter, + condition_id, + shape_id, + optimisation, + branch_key + ) do node_id = {condition_id, optimisation.field} - case node_shape_entry_for_shape(table, shape_id, node_id) do + case node_shape_entry_for_shape(table, shape_id, node_id, branch_key) do nil -> :deleted {dep_index, polarity, next_condition_id} -> _ = - WhereCondition.remove_shape(filter, next_condition_id, shape_id, optimisation.and_where) + WhereCondition.remove_shape( + filter, + next_condition_id, + shape_id, + optimisation.and_where, + branch_key + ) :ets.match_delete( table, - {{:node_shape, node_id}, {shape_id, dep_index, polarity, next_condition_id}} + {{:node_shape, node_id}, {shape_id, dep_index, polarity, next_condition_id, branch_key}} ) if polarity == :negated do @@ -170,12 +195,13 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndex do :ets.match_delete( table, - {{:shape_node, shape_id}, {node_id, dep_index, polarity, next_condition_id}} + {{:shape_node, shape_id}, {node_id, dep_index, polarity, next_condition_id, branch_key}} ) :ets.match_delete( table, - {{:shape_dep_node, shape_id, dep_index}, {node_id, polarity, next_condition_id}} + {{:shape_dep_node, shape_id, dep_index}, + {node_id, polarity, next_condition_id, branch_key}} ) :ets.match_delete(table, {{:node_fallback, node_id}, {shape_id, next_condition_id}}) @@ -209,7 +235,7 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndex do def mark_ready(table, shape_handle) do :ets.delete(table, {:fallback, shape_handle}) - for {node_id, _dep_index, _polarity, _next_condition_id} <- + for {node_id, _dep_index, _polarity, _next_condition_id, _branch_key} <- nodes_for_shape(table, shape_handle) do :ets.match_delete(table, {{:node_fallback, node_id}, {shape_handle, :_}}) end @@ -222,7 +248,7 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndex do """ @spec add_value(t(), term(), [String.t()], non_neg_integer(), term()) :: :ok def add_value(table, shape_handle, subquery_ref, dep_index, value) do - for {node_id, polarity, next_condition_id} <- + for {node_id, polarity, next_condition_id, _branch_key} <- nodes_for_shape_dependency(table, shape_handle, dep_index) do case polarity do :positive -> @@ -248,7 +274,7 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndex do """ @spec remove_value(t(), term(), [String.t()], non_neg_integer(), term()) :: :ok def remove_value(table, shape_handle, subquery_ref, dep_index, value) do - for {node_id, polarity, next_condition_id} <- + for {node_id, polarity, next_condition_id, _branch_key} <- nodes_for_shape_dependency(table, shape_handle, dep_index) do case polarity do :positive -> @@ -366,7 +392,9 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndex do def positions_for_shape(table, shape_handle) do table |> nodes_for_shape(shape_handle) - |> Enum.map(fn {node_id, _dep_index, _polarity, _next_condition_id} -> node_id end) + |> Enum.map(fn {node_id, _dep_index, _polarity, _next_condition_id, _branch_key} -> + node_id + end) end defp ensure_node_meta(table, node_id, testexpr) do @@ -407,11 +435,11 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndex do |> Enum.map(&elem(&1, 1)) end - defp node_shape_entry_for_shape(table, shape_id, node_id) do + defp node_shape_entry_for_shape(table, shape_id, node_id, branch_key) do table |> nodes_for_shape(shape_id) |> Enum.find_value(fn - {^node_id, dep_index, polarity, next_condition_id} -> + {^node_id, dep_index, polarity, next_condition_id, ^branch_key} -> {dep_index, polarity, next_condition_id} _ -> @@ -427,7 +455,8 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndex do table |> :ets.lookup({:node_shape, node_id}) |> Enum.reduce(MapSet.new(), fn - {{:node_shape, ^node_id}, {shape_id, _dep_index, _polarity, next_condition_id}}, acc -> + {{:node_shape, ^node_id}, {shape_id, _dep_index, _polarity, next_condition_id, _branch_key}}, + acc -> MapSet.put(acc, {shape_id, next_condition_id}) _, acc -> diff --git a/packages/sync-service/lib/electric/shapes/filter/where_condition.ex b/packages/sync-service/lib/electric/shapes/filter/where_condition.ex index 568df4b039..5e34b4831f 100644 --- a/packages/sync-service/lib/electric/shapes/filter/where_condition.ex +++ b/packages/sync-service/lib/electric/shapes/filter/where_condition.ex @@ -8,7 +8,7 @@ defmodule Electric.Shapes.Filter.WhereCondition do Each WhereCondition is identified by a unique reference and stores: - `index_keys`: MapSet of {field_key, operation} tuples for indexed conditions - - `other_shapes`: map of shape_id -> where_clause for non-optimized shapes + - `other_shapes`: map of {shape_id, branch_key} -> where_clause for non-optimized shapes The logic for specific indexes (equality, inclusion) is handled by dedicated modules that also use ETS. """ @@ -38,23 +38,34 @@ defmodule Electric.Shapes.Filter.WhereCondition do @spec indexed_where?(Expr.t() | nil) :: boolean() def indexed_where?(where_clause), do: optimise_where(where_clause) != :not_optimised - def add_shape(%Filter{where_cond_table: table} = filter, condition_id, shape_id, where_clause) do + def add_shape(%Filter{} = filter, condition_id, shape_id, where_clause) do + add_shape(filter, condition_id, shape_id, where_clause, []) + end + + @doc false + def add_shape( + %Filter{where_cond_table: table} = filter, + condition_id, + shape_id, + where_clause, + branch_key + ) do case optimise_where(where_clause) do :not_optimised -> - add_shape_to_other_shapes(table, condition_id, shape_id, where_clause) + add_shape_to_other_shapes(table, condition_id, shape_id, branch_key, where_clause) {:or, left, right} -> - add_shape(filter, condition_id, shape_id, left) - add_shape(filter, condition_id, shape_id, right) + add_shape(filter, condition_id, shape_id, left, branch_key ++ [:left]) + add_shape(filter, condition_id, shape_id, right, branch_key ++ [:right]) %{operation: _} = optimisation -> - add_shape_to_index(filter, condition_id, shape_id, optimisation) + add_shape_to_index(filter, condition_id, shape_id, optimisation, branch_key) end end - defp add_shape_to_other_shapes(table, condition_id, shape_id, where_clause) do + defp add_shape_to_other_shapes(table, condition_id, shape_id, branch_key, where_clause) do [{_, {index_keys, other_shapes}}] = :ets.lookup(table, condition_id) - other_shapes = Map.put(other_shapes, shape_id, where_clause) + other_shapes = Map.put(other_shapes, {shape_id, branch_key}, where_clause) :ets.insert(table, {condition_id, {index_keys, other_shapes}}) end @@ -62,14 +73,15 @@ defmodule Electric.Shapes.Filter.WhereCondition do %Filter{where_cond_table: table} = filter, condition_id, shape_id, - optimisation + optimisation, + branch_key ) do [{_, {index_keys, other_shapes}}] = :ets.lookup(table, condition_id) key = {optimisation.field, optimisation.operation} index_keys = MapSet.put(index_keys, key) :ets.insert(table, {condition_id, {index_keys, other_shapes}}) - Index.add_shape(filter, condition_id, shape_id, optimisation) + Index.add_shape(filter, condition_id, shape_id, optimisation, branch_key) end @doc false @@ -214,33 +226,39 @@ defmodule Electric.Shapes.Filter.WhereCondition do or `:ok` if the condition still has shapes. """ @spec remove_shape(Filter.t(), reference(), String.t(), Expr.t() | nil) :: :deleted | :ok + def remove_shape(%Filter{} = filter, condition_id, shape_id, where_clause) do + remove_shape(filter, condition_id, shape_id, where_clause, []) + end + + @doc false def remove_shape( %Filter{where_cond_table: table} = filter, condition_id, shape_id, - where_clause + where_clause, + branch_key ) do case optimise_where(where_clause) do :not_optimised -> - remove_shape_from_other_shapes(table, condition_id, shape_id) + remove_shape_from_other_shapes(table, condition_id, shape_id, branch_key) {:or, left, right} -> - _ = remove_shape(filter, condition_id, shape_id, left) - _ = remove_shape(filter, condition_id, shape_id, right) + _ = remove_shape(filter, condition_id, shape_id, left, branch_key ++ [:left]) + _ = remove_shape(filter, condition_id, shape_id, right, branch_key ++ [:right]) condition_status(table, condition_id) %{operation: _} = optimisation -> - remove_shape_from_index(filter, condition_id, shape_id, optimisation) + remove_shape_from_index(filter, condition_id, shape_id, optimisation, branch_key) end end - defp remove_shape_from_other_shapes(table, condition_id, shape_id) do + defp remove_shape_from_other_shapes(table, condition_id, shape_id, branch_key) do case :ets.lookup(table, condition_id) do [] -> :deleted [{_, {index_keys, other_shapes}}] -> - other_shapes = Map.delete(other_shapes, shape_id) + other_shapes = Map.delete(other_shapes, {shape_id, branch_key}) update_or_delete_condition(table, condition_id, index_keys, other_shapes) end end @@ -249,9 +267,10 @@ defmodule Electric.Shapes.Filter.WhereCondition do %Filter{where_cond_table: table} = filter, condition_id, shape_id, - optimisation + optimisation, + branch_key ) do - case Index.remove_shape(filter, condition_id, shape_id, optimisation) do + case Index.remove_shape(filter, condition_id, shape_id, optimisation, branch_key) do :deleted -> case :ets.lookup(table, condition_id) do [] -> @@ -330,7 +349,7 @@ defmodule Electric.Shapes.Filter.WhereCondition do "filter.filter_other_shapes", [shape_count: map_size(other_shapes)], fn -> - for {shape_id, where} <- other_shapes, + for {{shape_id, _branch_key}, where} <- other_shapes, other_shape_matches?(index, shape_id, where, record), into: MapSet.new() do shape_id @@ -383,7 +402,7 @@ defmodule Electric.Shapes.Filter.WhereCondition do end) other_shape_ids = - Enum.reduce(other_shapes, MapSet.new(), fn {shape_id, _}, acc -> + Enum.reduce(other_shapes, MapSet.new(), fn {{shape_id, _branch_key}, _}, acc -> MapSet.put(acc, shape_id) end) diff --git a/packages/sync-service/test/electric/shapes/filter/subquery_index_test.exs b/packages/sync-service/test/electric/shapes/filter/subquery_index_test.exs index 48df275c7a..bc5987ee59 100644 --- a/packages/sync-service/test/electric/shapes/filter/subquery_index_test.exs +++ b/packages/sync-service/test/electric/shapes/filter/subquery_index_test.exs @@ -69,21 +69,21 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndexTest do register_node_shape(filter, table, condition_id, "s3") assert [ - {{:node_shape, {^condition_id, @field}}, {"s1", 0, :positive, _}}, - {{:node_shape, {^condition_id, @field}}, {"s2", 0, :positive, _}}, - {{:node_shape, {^condition_id, @field}}, {"s3", 0, :positive, _}} + {{:node_shape, {^condition_id, @field}}, {"s1", 0, :positive, _, []}}, + {{:node_shape, {^condition_id, @field}}, {"s2", 0, :positive, _, []}}, + {{:node_shape, {^condition_id, @field}}, {"s3", 0, :positive, _, []}} ] = Enum.sort(:ets.lookup(table, {:node_shape, {condition_id, @field}})) assert :ok = - SubqueryIndex.remove_shape(filter, condition_id, "s1", subquery_optimisation()) + SubqueryIndex.remove_shape(filter, condition_id, "s1", subquery_optimisation(), []) assert MapSet.new(["s2", "s3"]) == SubqueryIndex.all_shape_ids(filter, condition_id, @field) assert :ok = - SubqueryIndex.remove_shape(filter, condition_id, "s2", subquery_optimisation()) + SubqueryIndex.remove_shape(filter, condition_id, "s2", subquery_optimisation(), []) assert :deleted = - SubqueryIndex.remove_shape(filter, condition_id, "s3", subquery_optimisation()) + SubqueryIndex.remove_shape(filter, condition_id, "s3", subquery_optimisation(), []) assert [] == :ets.lookup(table, {:node_shape, {condition_id, @field}}) assert [] == :ets.lookup(table, {:node_meta, {condition_id, @field}}) @@ -150,7 +150,7 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndexTest do SubqueryIndex.add_value(table, "s1", @subquery_ref, 0, 5) assert :deleted = - SubqueryIndex.remove_shape(filter, condition_id, "s1", subquery_optimisation()) + SubqueryIndex.remove_shape(filter, condition_id, "s1", subquery_optimisation(), []) refute SubqueryIndex.has_positions?(table, "s1") @@ -173,7 +173,7 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndexTest do defp register_node_shape(filter, table, condition_id, shape_id, opts \\ []) do SubqueryIndex.register_shape(table, shape_id, make_plan(opts)) - :ok = SubqueryIndex.add_shape(filter, condition_id, shape_id, subquery_optimisation(opts)) + :ok = SubqueryIndex.add_shape(filter, condition_id, shape_id, subquery_optimisation(opts), []) end defp subquery_optimisation(opts \\ []) do diff --git a/packages/sync-service/test/electric/shapes/filter/subquery_node_test.exs b/packages/sync-service/test/electric/shapes/filter/subquery_node_test.exs index 1696e9eecb..580c3394de 100644 --- a/packages/sync-service/test/electric/shapes/filter/subquery_node_test.exs +++ b/packages/sync-service/test/electric/shapes/filter/subquery_node_test.exs @@ -136,7 +136,8 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndexNodeTest do filter, condition_id, "shape1", - subquery_optimisation() + subquery_optimisation(), + [] ) refute SubqueryIndex.has_positions?(reverse_index, "shape1") @@ -149,7 +150,8 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndexNodeTest do filter, condition_id, "shape2", - subquery_optimisation(field: @other_field) + subquery_optimisation(field: @other_field), + [] ) end end @@ -162,7 +164,8 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndexNodeTest do filter, condition_id, shape_id, - subquery_optimisation(opts) + subquery_optimisation(opts), + [] ) end diff --git a/packages/sync-service/test/electric/shapes/filter_test.exs b/packages/sync-service/test/electric/shapes/filter_test.exs index 646acb24ed..0b6e87dcf9 100644 --- a/packages/sync-service/test/electric/shapes/filter_test.exs +++ b/packages/sync-service/test/electric/shapes/filter_test.exs @@ -498,6 +498,14 @@ defmodule Electric.Shapes.FilterTest do {%{"id" => "1", "number" => "4"}, false}, {%{"id" => "3", "number" => "3"}, false} ] + }, + %{ + where: "(id = 1 AND number > 2) OR (id = 1 AND number < 1)", + records: [ + {%{"id" => "1", "number" => "3"}, true}, + {%{"id" => "1", "number" => "0"}, true}, + {%{"id" => "1", "number" => "2"}, false} + ] } ] @@ -538,6 +546,10 @@ defmodule Electric.Shapes.FilterTest do Shape.new!("table", where: "id = 1 AND 1 = ANY(an_array)", inspector: @inspector), Shape.new!("table", where: "id = 1 OR id = 2", inspector: @inspector), Shape.new!("table", where: "id = 1 OR number > 5", inspector: @inspector), + Shape.new!("table", + where: "(id = 1 AND number > 2) OR (id = 1 AND number < 1)", + inspector: @inspector + ), Shape.new!("table", where: "id = 1 AND (number = 2 OR number = 3)", inspector: @inspector), Shape.new!("table", where: "(id = 1 OR id = 2) AND number = 3", inspector: @inspector), Shape.new!("table", where: "id IN (1, 2, 3)", inspector: @inspector),