Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/stale-pears-dance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@core/sync-service': patch
---

Optimise OR routing in sync-service filters by indexing OR branches when both sides are indexable, removing the dedicated IN special case, and falling back to `other_shapes` when an OR branch is not optimisable.
42 changes: 11 additions & 31 deletions packages/sync-service/lib/electric/shapes/filter/index.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,24 @@ defmodule Electric.Shapes.Filter.Index do
defp module_for("@>"), do: InclusionIndex
defp module_for("subquery"), do: SubqueryIndex

# "in" delegates to EqualityIndex, registering the shape under each value
def add_shape(%Filter{} = filter, where_cond_id, shape_id, %{operation: "in"} = optimisation) do
eq_base = optimisation |> Map.delete(:values) |> Map.put(:operation, "=")

for value <- optimisation.values do
EqualityIndex.add_shape(filter, where_cond_id, shape_id, Map.put(eq_base, :value, value))
end

:ok
end

def add_shape(%Filter{} = filter, where_cond_id, shape_id, %{operation: op} = optimisation) do
module_for(op).add_shape(filter, where_cond_id, shape_id, optimisation)
def add_shape(
%Filter{} = filter,
where_cond_id,
shape_id,
%{operation: op} = optimisation,
branch_key
) do
module_for(op).add_shape(filter, where_cond_id, shape_id, optimisation, branch_key)
end

def remove_shape(
%Filter{} = filter,
where_cond_id,
shape_id,
%{operation: "in"} = optimisation
%{operation: op} = optimisation,
branch_key
) do
eq_base = optimisation |> Map.delete(:values) |> Map.put(:operation, "=")

results =
for value <- optimisation.values do
EqualityIndex.remove_shape(
filter,
where_cond_id,
shape_id,
Map.put(eq_base, :value, value)
)
end

if :deleted in results, do: :deleted, else: :ok
end

def remove_shape(%Filter{} = filter, where_cond_id, shape_id, %{operation: op} = optimisation) do
module_for(op).remove_shape(filter, where_cond_id, shape_id, optimisation)
module_for(op).remove_shape(filter, where_cond_id, shape_id, optimisation, branch_key)
end

def affected_shapes(%Filter{} = filter, where_cond_id, field, operation, record) do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@ defmodule Electric.Shapes.Filter.Indexes.EqualityIndex do

@env Env.new()

def add_shape(%Filter{eq_index_table: table} = filter, condition_id, shape_id, optimisation) do
def add_shape(
%Filter{eq_index_table: table} = filter,
condition_id,
shape_id,
optimisation,
branch_key
) do
%{field: field, type: type, value: value, and_where: and_where} = optimisation
key = {condition_id, field, value}

Expand All @@ -39,10 +45,16 @@ defmodule Electric.Shapes.Filter.Indexes.EqualityIndex do
existing_id
end

WhereCondition.add_shape(filter, next_condition_id, shape_id, and_where)
WhereCondition.add_shape(filter, next_condition_id, shape_id, and_where, branch_key)
end

def remove_shape(%Filter{eq_index_table: table} = filter, condition_id, shape_id, optimisation) do
def remove_shape(
%Filter{eq_index_table: table} = filter,
condition_id,
shape_id,
optimisation,
branch_key
) do
%{field: field, value: value, and_where: and_where} = optimisation
key = {condition_id, field, value}

Expand All @@ -51,7 +63,13 @@ defmodule Electric.Shapes.Filter.Indexes.EqualityIndex do
:deleted

[{_, {_type, next_condition_id}}] ->
case WhereCondition.remove_shape(filter, next_condition_id, shape_id, and_where) do
case WhereCondition.remove_shape(
filter,
next_condition_id,
shape_id,
and_where,
branch_key
) do
:deleted ->
:ets.delete(table, key)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@ defmodule Electric.Shapes.Filter.Indexes.InclusionIndex do

@env Env.new()

def add_shape(%Filter{incl_index_table: table} = filter, condition_id, shape_id, optimisation) do
def add_shape(
%Filter{incl_index_table: table} = filter,
condition_id,
shape_id,
optimisation,
branch_key
) do
%{field: field, type: type, value: array_value, and_where: and_where} = optimisation
:ets.insert(table, {{:type, condition_id, field}, type})

Expand All @@ -42,7 +48,8 @@ defmodule Electric.Shapes.Filter.Indexes.InclusionIndex do
condition_id: condition_id,
field: field,
shape_id: shape_id,
and_where: and_where
and_where: and_where,
branch_key: branch_key
}

add_shape_to_node(ctx, [], values)
Expand Down Expand Up @@ -92,7 +99,13 @@ defmodule Electric.Shapes.Filter.Indexes.InclusionIndex do
existing_id
end

WhereCondition.add_shape(ctx.filter, node_condition_id, ctx.shape_id, ctx.and_where)
WhereCondition.add_shape(
ctx.filter,
node_condition_id,
ctx.shape_id,
ctx.and_where,
ctx.branch_key
)
end

defp get_or_create_node(table, node_key) do
Expand Down Expand Up @@ -125,7 +138,8 @@ defmodule Electric.Shapes.Filter.Indexes.InclusionIndex do
%Filter{incl_index_table: table} = filter,
condition_id,
shape_id,
optimisation
optimisation,
branch_key
) do
%{field: field, value: array_value, and_where: and_where} = optimisation
ordered = array_value |> Enum.sort() |> Enum.dedup()
Expand All @@ -136,7 +150,8 @@ defmodule Electric.Shapes.Filter.Indexes.InclusionIndex do
condition_id: condition_id,
field: field,
shape_id: shape_id,
and_where: and_where
and_where: and_where,
branch_key: branch_key
}

remove_shape_from_node(ctx, [], ordered)
Expand Down Expand Up @@ -198,7 +213,8 @@ defmodule Electric.Shapes.Filter.Indexes.InclusionIndex do
ctx.filter,
node_condition_id,
ctx.shape_id,
ctx.and_where
ctx.and_where,
ctx.branch_key
) do
:deleted ->
:ets.insert(ctx.table, {node_key, %{node | condition_id: nil}})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,20 +105,33 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndex do
@doc """
Register a shape on a concrete subquery filter node.
"""
@spec add_shape(Filter.t(), reference(), term(), map()) :: :ok
def add_shape(%Filter{subquery_index: table} = filter, condition_id, shape_id, optimisation) do
@spec add_shape(Filter.t(), reference(), term(), map(), [atom()]) :: :ok
def add_shape(
%Filter{subquery_index: table} = filter,
condition_id,
shape_id,
optimisation,
branch_key
) do
node_id = {condition_id, optimisation.field}
next_condition_id = make_ref()

WhereCondition.init(filter, next_condition_id)
WhereCondition.add_shape(filter, next_condition_id, shape_id, optimisation.and_where)

WhereCondition.add_shape(
filter,
next_condition_id,
shape_id,
optimisation.and_where,
branch_key
)

ensure_node_meta(table, node_id, optimisation.testexpr)

:ets.insert(
table,
{{:node_shape, node_id},
{shape_id, optimisation.dep_index, optimisation.polarity, next_condition_id}}
{shape_id, optimisation.dep_index, optimisation.polarity, next_condition_id, branch_key}}
)

if optimisation.polarity == :negated do
Expand All @@ -128,13 +141,13 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndex do
:ets.insert(
table,
{{:shape_node, shape_id},
{node_id, optimisation.dep_index, optimisation.polarity, next_condition_id}}
{node_id, optimisation.dep_index, optimisation.polarity, next_condition_id, branch_key}}
)

:ets.insert(
table,
{{:shape_dep_node, shape_id, optimisation.dep_index},
{node_id, optimisation.polarity, next_condition_id}}
{node_id, optimisation.polarity, next_condition_id, branch_key}}
)

:ets.insert(table, {{:node_fallback, node_id}, {shape_id, next_condition_id}})
Expand All @@ -144,21 +157,33 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndex do
@doc """
Remove a shape from a concrete subquery filter node.
"""
@spec remove_shape(Filter.t(), reference(), term(), map()) :: :deleted | :ok
def remove_shape(%Filter{subquery_index: table} = filter, condition_id, shape_id, optimisation) do
@spec remove_shape(Filter.t(), reference(), term(), map(), [atom()]) :: :deleted | :ok
def remove_shape(
%Filter{subquery_index: table} = filter,
condition_id,
shape_id,
optimisation,
branch_key
) do
node_id = {condition_id, optimisation.field}

case node_shape_entry_for_shape(table, shape_id, node_id) do
case node_shape_entry_for_shape(table, shape_id, node_id, branch_key) do
nil ->
:deleted

{dep_index, polarity, next_condition_id} ->
_ =
WhereCondition.remove_shape(filter, next_condition_id, shape_id, optimisation.and_where)
WhereCondition.remove_shape(
filter,
next_condition_id,
shape_id,
optimisation.and_where,
branch_key
)

:ets.match_delete(
table,
{{:node_shape, node_id}, {shape_id, dep_index, polarity, next_condition_id}}
{{:node_shape, node_id}, {shape_id, dep_index, polarity, next_condition_id, branch_key}}
)

if polarity == :negated do
Expand All @@ -170,12 +195,13 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndex do

:ets.match_delete(
table,
{{:shape_node, shape_id}, {node_id, dep_index, polarity, next_condition_id}}
{{:shape_node, shape_id}, {node_id, dep_index, polarity, next_condition_id, branch_key}}
)

:ets.match_delete(
table,
{{:shape_dep_node, shape_id, dep_index}, {node_id, polarity, next_condition_id}}
{{:shape_dep_node, shape_id, dep_index},
{node_id, polarity, next_condition_id, branch_key}}
)

:ets.match_delete(table, {{:node_fallback, node_id}, {shape_id, next_condition_id}})
Expand Down Expand Up @@ -209,7 +235,7 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndex do
def mark_ready(table, shape_handle) do
:ets.delete(table, {:fallback, shape_handle})

for {node_id, _dep_index, _polarity, _next_condition_id} <-
for {node_id, _dep_index, _polarity, _next_condition_id, _branch_key} <-
nodes_for_shape(table, shape_handle) do
:ets.match_delete(table, {{:node_fallback, node_id}, {shape_handle, :_}})
end
Expand All @@ -222,7 +248,7 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndex do
"""
@spec add_value(t(), term(), [String.t()], non_neg_integer(), term()) :: :ok
def add_value(table, shape_handle, subquery_ref, dep_index, value) do
for {node_id, polarity, next_condition_id} <-
for {node_id, polarity, next_condition_id, _branch_key} <-
nodes_for_shape_dependency(table, shape_handle, dep_index) do
case polarity do
:positive ->
Expand All @@ -248,7 +274,7 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndex do
"""
@spec remove_value(t(), term(), [String.t()], non_neg_integer(), term()) :: :ok
def remove_value(table, shape_handle, subquery_ref, dep_index, value) do
for {node_id, polarity, next_condition_id} <-
for {node_id, polarity, next_condition_id, _branch_key} <-
nodes_for_shape_dependency(table, shape_handle, dep_index) do
case polarity do
:positive ->
Expand Down Expand Up @@ -366,7 +392,9 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndex do
def positions_for_shape(table, shape_handle) do
table
|> nodes_for_shape(shape_handle)
|> Enum.map(fn {node_id, _dep_index, _polarity, _next_condition_id} -> node_id end)
|> Enum.map(fn {node_id, _dep_index, _polarity, _next_condition_id, _branch_key} ->
node_id
end)
end

defp ensure_node_meta(table, node_id, testexpr) do
Expand Down Expand Up @@ -407,11 +435,11 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndex do
|> Enum.map(&elem(&1, 1))
end

defp node_shape_entry_for_shape(table, shape_id, node_id) do
defp node_shape_entry_for_shape(table, shape_id, node_id, branch_key) do
table
|> nodes_for_shape(shape_id)
|> Enum.find_value(fn
{^node_id, dep_index, polarity, next_condition_id} ->
{^node_id, dep_index, polarity, next_condition_id, ^branch_key} ->
{dep_index, polarity, next_condition_id}

_ ->
Expand All @@ -427,7 +455,8 @@ defmodule Electric.Shapes.Filter.Indexes.SubqueryIndex do
table
|> :ets.lookup({:node_shape, node_id})
|> Enum.reduce(MapSet.new(), fn
{{:node_shape, ^node_id}, {shape_id, _dep_index, _polarity, next_condition_id}}, acc ->
{{:node_shape, ^node_id}, {shape_id, _dep_index, _polarity, next_condition_id, _branch_key}},
acc ->
MapSet.put(acc, {shape_id, next_condition_id})

_, acc ->
Expand Down
Loading
Loading