diff --git a/.changeset/stale-pears-dance.md b/.changeset/stale-pears-dance.md new file mode 100644 index 0000000000..beb3f77709 --- /dev/null +++ b/.changeset/stale-pears-dance.md @@ -0,0 +1,5 @@ +--- +'@core/sync-service': patch +--- + +Optimise OR routing in sync-service filters by indexing OR branches when both sides are indexable, removing the dedicated IN special case, and falling back to `other_shapes` when an OR branch is not optimisable. diff --git a/packages/sync-service/lib/electric/shapes/filter/index.ex b/packages/sync-service/lib/electric/shapes/filter/index.ex index 001230feba..3420cd7067 100644 --- a/packages/sync-service/lib/electric/shapes/filter/index.ex +++ b/packages/sync-service/lib/electric/shapes/filter/index.ex @@ -16,44 +16,24 @@ defmodule Electric.Shapes.Filter.Index do defp module_for("@>"), do: InclusionIndex defp module_for("subquery"), do: SubqueryIndex - # "in" delegates to EqualityIndex, registering the shape under each value - def add_shape(%Filter{} = filter, where_cond_id, shape_id, %{operation: "in"} = optimisation) do - eq_base = optimisation |> Map.delete(:values) |> Map.put(:operation, "=") - - for value <- optimisation.values do - EqualityIndex.add_shape(filter, where_cond_id, shape_id, Map.put(eq_base, :value, value)) - end - - :ok - end - - def add_shape(%Filter{} = filter, where_cond_id, shape_id, %{operation: op} = optimisation) do - module_for(op).add_shape(filter, where_cond_id, shape_id, optimisation) + def add_shape( + %Filter{} = filter, + where_cond_id, + shape_id, + %{operation: op} = optimisation, + branch_key + ) do + module_for(op).add_shape(filter, where_cond_id, shape_id, optimisation, branch_key) end def remove_shape( %Filter{} = filter, where_cond_id, shape_id, - %{operation: "in"} = optimisation + %{operation: op} = optimisation, + branch_key ) do - eq_base = optimisation |> Map.delete(:values) |> Map.put(:operation, "=") - - results = - for value <- optimisation.values do - EqualityIndex.remove_shape( - filter, - where_cond_id, - shape_id, - Map.put(eq_base, :value, value) - ) - end - - if :deleted in results, do: :deleted, else: :ok - end - - def remove_shape(%Filter{} = filter, where_cond_id, shape_id, %{operation: op} = optimisation) do - module_for(op).remove_shape(filter, where_cond_id, shape_id, optimisation) + module_for(op).remove_shape(filter, where_cond_id, shape_id, optimisation, branch_key) end def affected_shapes(%Filter{} = filter, where_cond_id, field, operation, record) do diff --git a/packages/sync-service/lib/electric/shapes/filter/indexes/equality_index.ex b/packages/sync-service/lib/electric/shapes/filter/indexes/equality_index.ex index 421e375c95..8df4b77067 100644 --- a/packages/sync-service/lib/electric/shapes/filter/indexes/equality_index.ex +++ b/packages/sync-service/lib/electric/shapes/filter/indexes/equality_index.ex @@ -20,7 +20,13 @@ defmodule Electric.Shapes.Filter.Indexes.EqualityIndex do @env Env.new() - def add_shape(%Filter{eq_index_table: table} = filter, condition_id, shape_id, optimisation) do + def add_shape( + %Filter{eq_index_table: table} = filter, + condition_id, + shape_id, + optimisation, + branch_key + ) do %{field: field, type: type, value: value, and_where: and_where} = optimisation key = {condition_id, field, value} @@ -39,10 +45,16 @@ defmodule Electric.Shapes.Filter.Indexes.EqualityIndex do existing_id end - WhereCondition.add_shape(filter, next_condition_id, shape_id, and_where) + WhereCondition.add_shape(filter, next_condition_id, shape_id, and_where, branch_key) end - def remove_shape(%Filter{eq_index_table: table} = filter, condition_id, shape_id, optimisation) do + def remove_shape( + %Filter{eq_index_table: table} = filter, + condition_id, + shape_id, + optimisation, + branch_key + ) do %{field: field, value: value, and_where: and_where} = optimisation key = {condition_id, field, value} @@ -51,7 +63,13 @@ defmodule Electric.Shapes.Filter.Indexes.EqualityIndex do :deleted [{_, {_type, next_condition_id}}] -> - case WhereCondition.remove_shape(filter, next_condition_id, shape_id, and_where) do + case WhereCondition.remove_shape( + filter, + next_condition_id, + shape_id, + and_where, + branch_key + ) do :deleted -> :ets.delete(table, key) diff --git a/packages/sync-service/lib/electric/shapes/filter/indexes/inclusion_index.ex b/packages/sync-service/lib/electric/shapes/filter/indexes/inclusion_index.ex index f6ba03ec8e..0d0710cb14 100644 --- a/packages/sync-service/lib/electric/shapes/filter/indexes/inclusion_index.ex +++ b/packages/sync-service/lib/electric/shapes/filter/indexes/inclusion_index.ex @@ -30,7 +30,13 @@ defmodule Electric.Shapes.Filter.Indexes.InclusionIndex do @env Env.new() - def add_shape(%Filter{incl_index_table: table} = filter, condition_id, shape_id, optimisation) do + def add_shape( + %Filter{incl_index_table: table} = filter, + condition_id, + shape_id, + optimisation, + branch_key + ) do %{field: field, type: type, value: array_value, and_where: and_where} = optimisation :ets.insert(table, {{:type, condition_id, field}, type}) @@ -42,7 +48,8 @@ defmodule Electric.Shapes.Filter.Indexes.InclusionIndex do condition_id: condition_id, field: field, shape_id: shape_id, - and_where: and_where + and_where: and_where, + branch_key: branch_key } add_shape_to_node(ctx, [], values) @@ -92,7 +99,13 @@ defmodule Electric.Shapes.Filter.Indexes.InclusionIndex do existing_id end - WhereCondition.add_shape(ctx.filter, node_condition_id, ctx.shape_id, ctx.and_where) + WhereCondition.add_shape( + ctx.filter, + node_condition_id, + ctx.shape_id, + ctx.and_where, + ctx.branch_key + ) end defp get_or_create_node(table, node_key) do @@ -125,7 +138,8 @@ defmodule Electric.Shapes.Filter.Indexes.InclusionIndex do %Filter{incl_index_table: table} = filter, condition_id, shape_id, - optimisation + optimisation, + branch_key ) do %{field: field, value: array_value, and_where: and_where} = optimisation ordered = array_value |> Enum.sort() |> Enum.dedup() @@ -136,7 +150,8 @@ defmodule Electric.Shapes.Filter.Indexes.InclusionIndex do condition_id: condition_id, field: field, shape_id: shape_id, - and_where: and_where + and_where: and_where, + branch_key: branch_key } remove_shape_from_node(ctx, [], ordered) @@ -198,7 +213,8 @@ defmodule Electric.Shapes.Filter.Indexes.InclusionIndex do ctx.filter, node_condition_id, ctx.shape_id, - ctx.and_where + ctx.and_where, + ctx.branch_key ) do :deleted -> :ets.insert(ctx.table, {node_key, %{node | condition_id: nil}}) diff --git a/packages/sync-service/lib/electric/shapes/filter/indexes/subquery_index.ex b/packages/sync-service/lib/electric/shapes/filter/indexes/subquery_index.ex index a34d29d962..47632a854d 100644 --- a/packages/sync-service/lib/electric/shapes/filter/indexes/subquery_index.ex +++ b/packages/sync-service/lib/electric/shapes/filter/indexes/subquery_index.ex @@ -105,20 +105,33 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndex do @doc """ Register a shape on a concrete subquery filter node. """ - @spec add_shape(Filter.t(), reference(), term(), map()) :: :ok - def add_shape(%Filter{subquery_index: table} = filter, condition_id, shape_id, optimisation) do + @spec add_shape(Filter.t(), reference(), term(), map(), [atom()]) :: :ok + def add_shape( + %Filter{subquery_index: table} = filter, + condition_id, + shape_id, + optimisation, + branch_key + ) do node_id = {condition_id, optimisation.field} next_condition_id = make_ref() WhereCondition.init(filter, next_condition_id) - WhereCondition.add_shape(filter, next_condition_id, shape_id, optimisation.and_where) + + WhereCondition.add_shape( + filter, + next_condition_id, + shape_id, + optimisation.and_where, + branch_key + ) ensure_node_meta(table, node_id, optimisation.testexpr) :ets.insert( table, {{:node_shape, node_id}, - {shape_id, optimisation.dep_index, optimisation.polarity, next_condition_id}} + {shape_id, optimisation.dep_index, optimisation.polarity, next_condition_id, branch_key}} ) if optimisation.polarity == :negated do @@ -128,13 +141,13 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndex do :ets.insert( table, {{:shape_node, shape_id}, - {node_id, optimisation.dep_index, optimisation.polarity, next_condition_id}} + {node_id, optimisation.dep_index, optimisation.polarity, next_condition_id, branch_key}} ) :ets.insert( table, {{:shape_dep_node, shape_id, optimisation.dep_index}, - {node_id, optimisation.polarity, next_condition_id}} + {node_id, optimisation.polarity, next_condition_id, branch_key}} ) :ets.insert(table, {{:node_fallback, node_id}, {shape_id, next_condition_id}}) @@ -144,21 +157,33 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndex do @doc """ Remove a shape from a concrete subquery filter node. """ - @spec remove_shape(Filter.t(), reference(), term(), map()) :: :deleted | :ok - def remove_shape(%Filter{subquery_index: table} = filter, condition_id, shape_id, optimisation) do + @spec remove_shape(Filter.t(), reference(), term(), map(), [atom()]) :: :deleted | :ok + def remove_shape( + %Filter{subquery_index: table} = filter, + condition_id, + shape_id, + optimisation, + branch_key + ) do node_id = {condition_id, optimisation.field} - case node_shape_entry_for_shape(table, shape_id, node_id) do + case node_shape_entry_for_shape(table, shape_id, node_id, branch_key) do nil -> :deleted {dep_index, polarity, next_condition_id} -> _ = - WhereCondition.remove_shape(filter, next_condition_id, shape_id, optimisation.and_where) + WhereCondition.remove_shape( + filter, + next_condition_id, + shape_id, + optimisation.and_where, + branch_key + ) :ets.match_delete( table, - {{:node_shape, node_id}, {shape_id, dep_index, polarity, next_condition_id}} + {{:node_shape, node_id}, {shape_id, dep_index, polarity, next_condition_id, branch_key}} ) if polarity == :negated do @@ -170,12 +195,13 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndex do :ets.match_delete( table, - {{:shape_node, shape_id}, {node_id, dep_index, polarity, next_condition_id}} + {{:shape_node, shape_id}, {node_id, dep_index, polarity, next_condition_id, branch_key}} ) :ets.match_delete( table, - {{:shape_dep_node, shape_id, dep_index}, {node_id, polarity, next_condition_id}} + {{:shape_dep_node, shape_id, dep_index}, + {node_id, polarity, next_condition_id, branch_key}} ) :ets.match_delete(table, {{:node_fallback, node_id}, {shape_id, next_condition_id}}) @@ -209,7 +235,7 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndex do def mark_ready(table, shape_handle) do :ets.delete(table, {:fallback, shape_handle}) - for {node_id, _dep_index, _polarity, _next_condition_id} <- + for {node_id, _dep_index, _polarity, _next_condition_id, _branch_key} <- nodes_for_shape(table, shape_handle) do :ets.match_delete(table, {{:node_fallback, node_id}, {shape_handle, :_}}) end @@ -222,7 +248,7 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndex do """ @spec add_value(t(), term(), [String.t()], non_neg_integer(), term()) :: :ok def add_value(table, shape_handle, subquery_ref, dep_index, value) do - for {node_id, polarity, next_condition_id} <- + for {node_id, polarity, next_condition_id, _branch_key} <- nodes_for_shape_dependency(table, shape_handle, dep_index) do case polarity do :positive -> @@ -248,7 +274,7 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndex do """ @spec remove_value(t(), term(), [String.t()], non_neg_integer(), term()) :: :ok def remove_value(table, shape_handle, subquery_ref, dep_index, value) do - for {node_id, polarity, next_condition_id} <- + for {node_id, polarity, next_condition_id, _branch_key} <- nodes_for_shape_dependency(table, shape_handle, dep_index) do case polarity do :positive -> @@ -366,7 +392,9 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndex do def positions_for_shape(table, shape_handle) do table |> nodes_for_shape(shape_handle) - |> Enum.map(fn {node_id, _dep_index, _polarity, _next_condition_id} -> node_id end) + |> Enum.map(fn {node_id, _dep_index, _polarity, _next_condition_id, _branch_key} -> + node_id + end) end defp ensure_node_meta(table, node_id, testexpr) do @@ -407,11 +435,11 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndex do |> Enum.map(&elem(&1, 1)) end - defp node_shape_entry_for_shape(table, shape_id, node_id) do + defp node_shape_entry_for_shape(table, shape_id, node_id, branch_key) do table |> nodes_for_shape(shape_id) |> Enum.find_value(fn - {^node_id, dep_index, polarity, next_condition_id} -> + {^node_id, dep_index, polarity, next_condition_id, ^branch_key} -> {dep_index, polarity, next_condition_id} _ -> @@ -427,7 +455,8 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndex do table |> :ets.lookup({:node_shape, node_id}) |> Enum.reduce(MapSet.new(), fn - {{:node_shape, ^node_id}, {shape_id, _dep_index, _polarity, next_condition_id}}, acc -> + {{:node_shape, ^node_id}, {shape_id, _dep_index, _polarity, next_condition_id, _branch_key}}, + acc -> MapSet.put(acc, {shape_id, next_condition_id}) _, acc -> diff --git a/packages/sync-service/lib/electric/shapes/filter/where_condition.ex b/packages/sync-service/lib/electric/shapes/filter/where_condition.ex index 1238de8516..5e34b4831f 100644 --- a/packages/sync-service/lib/electric/shapes/filter/where_condition.ex +++ b/packages/sync-service/lib/electric/shapes/filter/where_condition.ex @@ -8,7 +8,7 @@ defmodule Electric.Shapes.Filter.WhereCondition do Each WhereCondition is identified by a unique reference and stores: - `index_keys`: MapSet of {field_key, operation} tuples for indexed conditions - - `other_shapes`: map of shape_id -> where_clause for non-optimized shapes + - `other_shapes`: map of {shape_id, branch_key} -> where_clause for non-optimized shapes The logic for specific indexes (equality, inclusion) is handled by dedicated modules that also use ETS. """ @@ -19,6 +19,7 @@ defmodule Electric.Shapes.Filter.WhereCondition do alias Electric.Replication.Eval.Parser.Func alias Electric.Replication.Eval.Parser.Ref alias Electric.Replication.Eval.Parser.RowExpr + alias Electric.Replication.PostgresInterop.Casting alias Electric.Shapes.Filter alias Electric.Shapes.Filter.Index alias Electric.Shapes.WhereClause @@ -31,41 +32,84 @@ defmodule Electric.Shapes.Filter.WhereCondition do end @doc """ - Returns `true` when the WHERE clause can use the primary equality/inclusion - indexes maintained by the filter. + Returns `true` when the WHERE clause can use the filter's indexed routing + path instead of relying entirely on `other_shapes`. """ @spec indexed_where?(Expr.t() | nil) :: boolean() def indexed_where?(where_clause), do: optimise_where(where_clause) != :not_optimised - def add_shape(%Filter{where_cond_table: table} = filter, condition_id, shape_id, where_clause) do + def add_shape(%Filter{} = filter, condition_id, shape_id, where_clause) do + add_shape(filter, condition_id, shape_id, where_clause, []) + end + + @doc false + def add_shape( + %Filter{where_cond_table: table} = filter, + condition_id, + shape_id, + where_clause, + branch_key + ) do case optimise_where(where_clause) do :not_optimised -> - [{_, {index_keys, other_shapes}}] = :ets.lookup(table, condition_id) - other_shapes = Map.put(other_shapes, shape_id, where_clause) - :ets.insert(table, {condition_id, {index_keys, other_shapes}}) + add_shape_to_other_shapes(table, condition_id, shape_id, branch_key, where_clause) - optimisation -> - add_shape_to_index(filter, condition_id, shape_id, optimisation) + {:or, left, right} -> + add_shape(filter, condition_id, shape_id, left, branch_key ++ [:left]) + add_shape(filter, condition_id, shape_id, right, branch_key ++ [:right]) + + %{operation: _} = optimisation -> + add_shape_to_index(filter, condition_id, shape_id, optimisation, branch_key) end end + defp add_shape_to_other_shapes(table, condition_id, shape_id, branch_key, where_clause) do + [{_, {index_keys, other_shapes}}] = :ets.lookup(table, condition_id) + other_shapes = Map.put(other_shapes, {shape_id, branch_key}, where_clause) + :ets.insert(table, {condition_id, {index_keys, other_shapes}}) + end + defp add_shape_to_index( %Filter{where_cond_table: table} = filter, condition_id, shape_id, - optimisation + optimisation, + branch_key ) do [{_, {index_keys, other_shapes}}] = :ets.lookup(table, condition_id) - key = {optimisation.field, index_key(optimisation.operation)} + key = {optimisation.field, optimisation.operation} index_keys = MapSet.put(index_keys, key) :ets.insert(table, {condition_id, {index_keys, other_shapes}}) - Index.add_shape(filter, condition_id, shape_id, optimisation) + Index.add_shape(filter, condition_id, shape_id, optimisation, branch_key) end @doc false + defp optimise_where(nil), do: :not_optimised + defp optimise_where(%Expr{eval: eval}), do: optimise_where(eval) + defp optimise_where(%Func{name: "or", args: [left, right]}) do + if optimise_where(left) != :not_optimised and optimise_where(right) != :not_optimised do + {:or, where_expr(left), where_expr(right)} + else + :not_optimised + end + end + + defp optimise_where(%Func{name: "and", args: [left, right]}) do + case {optimise_where(left), optimise_where(right)} do + {%{operation: _} = optimisation, _} -> + %{optimisation | and_where: merge_and_where(optimisation.and_where, right)} + + {_, %{operation: _} = optimisation} -> + %{optimisation | and_where: merge_and_where(left, optimisation.and_where)} + + _ -> + :not_optimised + end + end + defp optimise_where(%Func{ name: ~s("="), args: [%Ref{path: [field], type: type}, %Const{value: value}] @@ -122,40 +166,38 @@ defmodule Electric.Shapes.Filter.WhereCondition do subquery_optimisation(subquery, :negated) end - # field IN (const1, const2, ...) → reuse = index with multiple values - defp optimise_where(%Func{name: "or"} = expr) do - case flatten_or_equalities(expr) do - {:ok, field, type, values} -> - %{operation: "in", field: field, type: type, values: values, and_where: nil} + defp optimise_where(_), do: :not_optimised - :error -> - :not_optimised - end + defp where_expr(eval) do + %Expr{eval: eval, used_refs: Parser.find_refs(eval), returns: :bool} end - defp optimise_where(%Func{name: "and", args: [arg1, arg2]}) do - case {optimise_where(arg1), optimise_where(arg2)} do - {%{operation: op, and_where: nil} = params, _} when op in ["=", "@>", "in", "subquery"] -> - %{params | and_where: where_expr(arg2)} + defp merge_and_where(nil, nil), do: nil - {_, %{operation: op, and_where: nil} = params} when op in ["=", "@>", "in", "subquery"] -> - %{params | and_where: where_expr(arg1)} + defp merge_and_where(left, nil), do: normalise_where(left) - _ -> - :not_optimised - end - end - - defp optimise_where(_), do: :not_optimised + defp merge_and_where(nil, right), do: normalise_where(right) - # "in" shares the EqualityIndex with "=", so use the same index key - defp index_key("in"), do: "=" - defp index_key(op), do: op + defp merge_and_where(left, right) do + left_eval = extract_eval(left) + right_eval = extract_eval(right) - defp where_expr(eval) do - %Expr{eval: eval, used_refs: Parser.find_refs(eval), returns: :bool} + where_expr(%Func{ + args: [left_eval, right_eval], + type: :bool, + implementation: &Casting.pg_and/2, + name: "and", + strict?: false, + location: min(Map.get(left_eval, :location, 0), Map.get(right_eval, :location, 0)) + }) end + defp normalise_where(%Expr{} = expr), do: expr + defp normalise_where(eval), do: where_expr(eval) + + defp extract_eval(%Expr{eval: eval}), do: eval + defp extract_eval(eval), do: eval + defp subquery_optimisation( %Func{name: "sublink_membership_check", args: [testexpr, %Ref{path: subquery_ref}]} = _subquery, @@ -177,44 +219,6 @@ defmodule Electric.Shapes.Filter.WhereCondition do end end - # Flatten an OR chain of equalities on the same field into {field, type, [values]} - defp flatten_or_equalities(expr) do - case collect_or_equalities(expr, []) do - {:ok, [{field, type, _} | _] = pairs} -> - if Enum.all?(pairs, fn {f, t, _} -> f == field and t == type end) do - values = Enum.map(pairs, fn {_, _, v} -> v end) - {:ok, field, type, values} - else - :error - end - - _ -> - :error - end - end - - defp collect_or_equalities(%Func{name: "or", args: [left, right]}, acc) do - with {:ok, acc} <- collect_or_equalities(left, acc) do - collect_or_equalities(right, acc) - end - end - - defp collect_or_equalities( - %Func{name: ~s("="), args: [%Ref{path: [field], type: type}, %Const{value: value}]}, - acc - ) do - {:ok, [{field, type, value} | acc]} - end - - defp collect_or_equalities( - %Func{name: ~s("="), args: [%Const{value: value}, %Ref{path: [field], type: type}]}, - acc - ) do - {:ok, [{field, type, value} | acc]} - end - - defp collect_or_equalities(_, _acc), do: :error - @doc """ Remove a shape from a WhereCondition. @@ -222,45 +226,74 @@ defmodule Electric.Shapes.Filter.WhereCondition do or `:ok` if the condition still has shapes. """ @spec remove_shape(Filter.t(), reference(), String.t(), Expr.t() | nil) :: :deleted | :ok + def remove_shape(%Filter{} = filter, condition_id, shape_id, where_clause) do + remove_shape(filter, condition_id, shape_id, where_clause, []) + end + + @doc false def remove_shape( %Filter{where_cond_table: table} = filter, condition_id, shape_id, - where_clause + where_clause, + branch_key ) do case optimise_where(where_clause) do :not_optimised -> - remove_shape_from_other_shapes(table, condition_id, shape_id) + remove_shape_from_other_shapes(table, condition_id, shape_id, branch_key) + + {:or, left, right} -> + _ = remove_shape(filter, condition_id, shape_id, left, branch_key ++ [:left]) + _ = remove_shape(filter, condition_id, shape_id, right, branch_key ++ [:right]) + condition_status(table, condition_id) - optimisation -> - remove_shape_from_index(filter, condition_id, shape_id, optimisation) + %{operation: _} = optimisation -> + remove_shape_from_index(filter, condition_id, shape_id, optimisation, branch_key) end end - defp remove_shape_from_other_shapes(table, condition_id, shape_id) do - [{_, {index_keys, other_shapes}}] = :ets.lookup(table, condition_id) - other_shapes = Map.delete(other_shapes, shape_id) - update_or_delete_condition(table, condition_id, index_keys, other_shapes) + defp remove_shape_from_other_shapes(table, condition_id, shape_id, branch_key) do + case :ets.lookup(table, condition_id) do + [] -> + :deleted + + [{_, {index_keys, other_shapes}}] -> + other_shapes = Map.delete(other_shapes, {shape_id, branch_key}) + update_or_delete_condition(table, condition_id, index_keys, other_shapes) + end end defp remove_shape_from_index( %Filter{where_cond_table: table} = filter, condition_id, shape_id, - optimisation + optimisation, + branch_key ) do - case Index.remove_shape(filter, condition_id, shape_id, optimisation) do + case Index.remove_shape(filter, condition_id, shape_id, optimisation, branch_key) do :deleted -> - [{_, {index_keys, other_shapes}}] = :ets.lookup(table, condition_id) - key = {optimisation.field, index_key(optimisation.operation)} - index_keys = MapSet.delete(index_keys, key) - update_or_delete_condition(table, condition_id, index_keys, other_shapes) + case :ets.lookup(table, condition_id) do + [] -> + :deleted + + [{_, {index_keys, other_shapes}}] -> + key = {optimisation.field, optimisation.operation} + index_keys = MapSet.delete(index_keys, key) + update_or_delete_condition(table, condition_id, index_keys, other_shapes) + end :ok -> :ok end end + defp condition_status(table, condition_id) do + case :ets.lookup(table, condition_id) do + [] -> :deleted + [_] -> :ok + end + end + defp update_or_delete_condition(table, condition_id, index_keys, other_shapes) when index_keys == %MapSet{} and other_shapes == %{} do :ets.delete(table, condition_id) @@ -316,7 +349,7 @@ defmodule Electric.Shapes.Filter.WhereCondition do "filter.filter_other_shapes", [shape_count: map_size(other_shapes)], fn -> - for {shape_id, where} <- other_shapes, + for {{shape_id, _branch_key}, where} <- other_shapes, other_shape_matches?(index, shape_id, where, record), into: MapSet.new() do shape_id @@ -369,7 +402,7 @@ defmodule Electric.Shapes.Filter.WhereCondition do end) other_shape_ids = - Enum.reduce(other_shapes, MapSet.new(), fn {shape_id, _}, acc -> + Enum.reduce(other_shapes, MapSet.new(), fn {{shape_id, _branch_key}, _}, acc -> MapSet.put(acc, shape_id) end) diff --git a/packages/sync-service/test/electric/shapes/filter/subquery_index_test.exs b/packages/sync-service/test/electric/shapes/filter/subquery_index_test.exs index 48df275c7a..bc5987ee59 100644 --- a/packages/sync-service/test/electric/shapes/filter/subquery_index_test.exs +++ b/packages/sync-service/test/electric/shapes/filter/subquery_index_test.exs @@ -69,21 +69,21 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndexTest do register_node_shape(filter, table, condition_id, "s3") assert [ - {{:node_shape, {^condition_id, @field}}, {"s1", 0, :positive, _}}, - {{:node_shape, {^condition_id, @field}}, {"s2", 0, :positive, _}}, - {{:node_shape, {^condition_id, @field}}, {"s3", 0, :positive, _}} + {{:node_shape, {^condition_id, @field}}, {"s1", 0, :positive, _, []}}, + {{:node_shape, {^condition_id, @field}}, {"s2", 0, :positive, _, []}}, + {{:node_shape, {^condition_id, @field}}, {"s3", 0, :positive, _, []}} ] = Enum.sort(:ets.lookup(table, {:node_shape, {condition_id, @field}})) assert :ok = - SubqueryIndex.remove_shape(filter, condition_id, "s1", subquery_optimisation()) + SubqueryIndex.remove_shape(filter, condition_id, "s1", subquery_optimisation(), []) assert MapSet.new(["s2", "s3"]) == SubqueryIndex.all_shape_ids(filter, condition_id, @field) assert :ok = - SubqueryIndex.remove_shape(filter, condition_id, "s2", subquery_optimisation()) + SubqueryIndex.remove_shape(filter, condition_id, "s2", subquery_optimisation(), []) assert :deleted = - SubqueryIndex.remove_shape(filter, condition_id, "s3", subquery_optimisation()) + SubqueryIndex.remove_shape(filter, condition_id, "s3", subquery_optimisation(), []) assert [] == :ets.lookup(table, {:node_shape, {condition_id, @field}}) assert [] == :ets.lookup(table, {:node_meta, {condition_id, @field}}) @@ -150,7 +150,7 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndexTest do SubqueryIndex.add_value(table, "s1", @subquery_ref, 0, 5) assert :deleted = - SubqueryIndex.remove_shape(filter, condition_id, "s1", subquery_optimisation()) + SubqueryIndex.remove_shape(filter, condition_id, "s1", subquery_optimisation(), []) refute SubqueryIndex.has_positions?(table, "s1") @@ -173,7 +173,7 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndexTest do defp register_node_shape(filter, table, condition_id, shape_id, opts \\ []) do SubqueryIndex.register_shape(table, shape_id, make_plan(opts)) - :ok = SubqueryIndex.add_shape(filter, condition_id, shape_id, subquery_optimisation(opts)) + :ok = SubqueryIndex.add_shape(filter, condition_id, shape_id, subquery_optimisation(opts), []) end defp subquery_optimisation(opts \\ []) do diff --git a/packages/sync-service/test/electric/shapes/filter/subquery_node_test.exs b/packages/sync-service/test/electric/shapes/filter/subquery_node_test.exs index 1696e9eecb..580c3394de 100644 --- a/packages/sync-service/test/electric/shapes/filter/subquery_node_test.exs +++ b/packages/sync-service/test/electric/shapes/filter/subquery_node_test.exs @@ -136,7 +136,8 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndexNodeTest do filter, condition_id, "shape1", - subquery_optimisation() + subquery_optimisation(), + [] ) refute SubqueryIndex.has_positions?(reverse_index, "shape1") @@ -149,7 +150,8 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndexNodeTest do filter, condition_id, "shape2", - subquery_optimisation(field: @other_field) + subquery_optimisation(field: @other_field), + [] ) end end @@ -162,7 +164,8 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndexNodeTest do filter, condition_id, shape_id, - subquery_optimisation(opts) + subquery_optimisation(opts), + [] ) end diff --git a/packages/sync-service/test/electric/shapes/filter_test.exs b/packages/sync-service/test/electric/shapes/filter_test.exs index 1cd9f48519..0b6e87dcf9 100644 --- a/packages/sync-service/test/electric/shapes/filter_test.exs +++ b/packages/sync-service/test/electric/shapes/filter_test.exs @@ -29,6 +29,18 @@ defmodule Electric.Shapes.FilterTest do assert Filter.indexed_shape?(shape) end + test "returns true for OR shapes when both branches are indexed" do + shape = Shape.new!("t1", where: "id = 7 OR id = 8", inspector: @inspector) + + assert Filter.indexed_shape?(shape) + end + + test "returns false for OR shapes when one branch is not indexed" do + shape = Shape.new!("t1", where: "id = 7 OR number > 5", inspector: @inspector) + + refute Filter.indexed_shape?(shape) + end + test "returns false for shapes without an indexable where clause" do shape = Shape.new!("t1", inspector: @inspector) @@ -460,6 +472,40 @@ defmodule Electric.Shapes.FilterTest do {%{"id" => "1", "number" => "3"}, false}, {%{"id" => "3", "number" => "6"}, false} ] + }, + %{ + where: "id = 1 OR id = 2", + records: [ + {%{"id" => "1"}, true}, + {%{"id" => "2"}, true}, + {%{"id" => "3"}, false} + ] + }, + %{ + where: "id = 1 AND (number = 2 OR number = 3)", + records: [ + {%{"id" => "1", "number" => "2"}, true}, + {%{"id" => "1", "number" => "3"}, true}, + {%{"id" => "1", "number" => "4"}, false}, + {%{"id" => "2", "number" => "2"}, false} + ] + }, + %{ + where: "(id = 1 OR id = 2) AND number = 3", + records: [ + {%{"id" => "1", "number" => "3"}, true}, + {%{"id" => "2", "number" => "3"}, true}, + {%{"id" => "1", "number" => "4"}, false}, + {%{"id" => "3", "number" => "3"}, false} + ] + }, + %{ + where: "(id = 1 AND number > 2) OR (id = 1 AND number < 1)", + records: [ + {%{"id" => "1", "number" => "3"}, true}, + {%{"id" => "1", "number" => "0"}, true}, + {%{"id" => "1", "number" => "2"}, false} + ] } ] @@ -498,9 +544,27 @@ defmodule Electric.Shapes.FilterTest do Shape.new!("table", where: "1 = ANY(an_array)", inspector: @inspector), Shape.new!("table", where: "2 = ANY(an_array)", inspector: @inspector), Shape.new!("table", where: "id = 1 AND 1 = ANY(an_array)", inspector: @inspector), + Shape.new!("table", where: "id = 1 OR id = 2", inspector: @inspector), + Shape.new!("table", where: "id = 1 OR number > 5", inspector: @inspector), + Shape.new!("table", + where: "(id = 1 AND number > 2) OR (id = 1 AND number < 1)", + inspector: @inspector + ), + Shape.new!("table", where: "id = 1 AND (number = 2 OR number = 3)", inspector: @inspector), + Shape.new!("table", where: "(id = 1 OR id = 2) AND number = 3", inspector: @inspector), Shape.new!("table", where: "id IN (1, 2, 3)", inspector: @inspector), Shape.new!("table", where: "id IN (4, 5)", inspector: @inspector), Shape.new!("table", where: "id IN (1, 2) AND number > 5", inspector: @inspector), + Shape.new!("table", + where: "id IN (SELECT id FROM another_table) OR id = 1", + inspector: @inspector, + feature_flags: ["allow_subqueries"] + ), + Shape.new!("table", + where: "id IN (SELECT id FROM another_table) OR number > 5", + inspector: @inspector, + feature_flags: ["allow_subqueries"] + ), Shape.new!("table", where: "id IN (SELECT id FROM another_table)", inspector: @inspector, @@ -765,7 +829,7 @@ defmodule Electric.Shapes.FilterTest do test "where clause in the form `const = ANY(array_field)` is optimised" do # Same shape count as @> but higher budget per shape because the ANY - # AST is deeper to pattern-match through optimise_where + # AST is deeper to pattern-match through the planner shape_count = @shape_count * 5 max_reductions = @max_reductions * 10 @@ -790,15 +854,16 @@ defmodule Electric.Shapes.FilterTest do end) end - test "where clause in the form `field IN (const1, const2, ...)` is optimised" do + test "where clause in the form `field = const1 OR field = const2` is optimised" do + max_reductions = @max_reductions * 2 filter = Filter.new() Enum.each(1..@shape_count, fn i -> shape = - Shape.new!("t1", where: "id IN (#{i}, #{i + @shape_count})", inspector: @inspector) + Shape.new!("t1", where: "id = #{i} OR id = #{i + @shape_count}", inspector: @inspector) add_reductions = reductions(fn -> Filter.add_shape(filter, i, shape) end) - assert add_reductions < @max_reductions + assert add_reductions < max_reductions end) change = change("t1", %{"id" => "7"}) @@ -806,11 +871,11 @@ defmodule Electric.Shapes.FilterTest do affected_reductions = reductions(fn -> Filter.affected_shapes(filter, change) end) - assert affected_reductions < @max_reductions + assert affected_reductions < max_reductions Enum.each(1..@shape_count, fn i -> remove_reductions = reductions(fn -> Filter.remove_shape(filter, i) end) - assert remove_reductions < @max_reductions + assert remove_reductions < max_reductions end) end @@ -1007,13 +1072,11 @@ defmodule Electric.Shapes.FilterTest do "CREATE TABLE IF NOT EXISTS or_parent (id INT PRIMARY KEY)", "CREATE TABLE IF NOT EXISTS or_child (id INT PRIMARY KEY, par_id INT REFERENCES or_parent(id), value TEXT NOT NULL)" ] - test "non-optimisable OR+subquery shape is affected by root table changes", %{ + test "optimisable OR+subquery shape is affected by root table changes", %{ inspector: inspector } do - # Shape with OR combining a subquery and a simple condition. - # OR is not optimisable, so the shape lands in other_shapes AND - # gets registered in the sublink inverted index. Root table changes - # must still be routed to this shape once seeded. + # Both OR branches are indexable, so the shape is registered twice in the + # filter tree: once for the subquery branch and once for the simple branch. {:ok, shape} = Shape.new("or_child", inspector: inspector, @@ -1067,6 +1130,53 @@ defmodule Electric.Shapes.FilterTest do assert Filter.affected_shapes(filter, update_into_shape) == MapSet.new(["shape1"]) end + @tag with_sql: [ + "CREATE TABLE IF NOT EXISTS or_parent (id INT PRIMARY KEY)", + "CREATE TABLE IF NOT EXISTS or_child (id INT PRIMARY KEY, par_id INT REFERENCES or_parent(id), value TEXT NOT NULL)" + ] + test "mixed OR+subquery shape falls back to other_shapes verification", %{ + inspector: inspector + } do + {:ok, shape} = + Shape.new("or_child", + inspector: inspector, + where: "par_id IN (SELECT id FROM or_parent) OR value LIKE 'target%'" + ) + + filter = Filter.new() + filter = Filter.add_shape(filter, "shape1", shape) + + index = Filter.subquery_index(filter) + subquery_ref = ["$sublink", "0"] + + for value <- [1, 2, 3] do + SubqueryIndex.add_value(index, "shape1", subquery_ref, 0, value) + end + + SubqueryIndex.mark_ready(index, "shape1") + + insert_matching_value = %NewRecord{ + relation: {"public", "or_child"}, + record: %{"id" => "99", "par_id" => "99", "value" => "target-me"} + } + + assert Filter.affected_shapes(filter, insert_matching_value) == MapSet.new(["shape1"]) + + insert_matching_subquery = %NewRecord{ + relation: {"public", "or_child"}, + record: %{"id" => "10", "par_id" => "2", "value" => "other"} + } + + assert Filter.affected_shapes(filter, insert_matching_subquery) == MapSet.new(["shape1"]) + + insert_no_match = %NewRecord{ + relation: {"public", "or_child"}, + record: %{"id" => "50", "par_id" => "99", "value" => "other"} + } + + assert Filter.affected_shapes(filter, insert_no_match) == MapSet.new([]) + end + @tag with_sql: [ "CREATE TABLE IF NOT EXISTS like_parent_unseeded (id INT PRIMARY KEY)", "CREATE TABLE IF NOT EXISTS like_child_unseeded (id INT PRIMARY KEY, name TEXT NOT NULL, parent_id INT REFERENCES like_parent_unseeded(id))"