Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/suppress-publication-config-casts.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@core/sync-service': patch
---

Suppress redundant publication-configuration casts while a submission is already in flight. Under a burst of shape arrivals the publication manager's RelationTracker no longer mints one Configurator cast per add_shape/remove_shape, preventing the `publication_manager_configurator` mailbox from growing unboundedly (issue #4396).
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ defmodule Electric.Replication.PublicationManager.RelationTracker do
prepared_relation_filters: MapSet.new(),
submitted_relation_filters: MapSet.new(),
committed_relation_filters: MapSet.new(),
# Snapshot of the filter set carried by the cast currently being processed
# by the Configurator. While set, the "committed lags submitted" retry in
# update_needed?/1 is suppressed so a burst of add_shape/remove_shape calls
# cannot mint one redundant cast each. Cleared on a terminal signal from the
# Configurator (full commit, per-relation error, or global error).
in_flight_relation_filters: nil,
waiters: %{},
# start with optimistic assumption about what the
# publication supports (altering and generated columns)
Expand All @@ -48,6 +54,7 @@ defmodule Electric.Replication.PublicationManager.RelationTracker do
prepared_relation_filters: internal_relation_filters(),
submitted_relation_filters: internal_relation_filters(),
committed_relation_filters: internal_relation_filters(),
in_flight_relation_filters: internal_relation_filters() | nil,
waiters: %{Electric.relation_id() => [waiter(), ...]},
publication_name: String.t(),
publishes_generated_columns?: boolean(),
Expand Down Expand Up @@ -265,16 +272,20 @@ defmodule Electric.Replication.PublicationManager.RelationTracker do
def handle_cast({:relation_configuration_result, {oid, _rel}, {:ok, :dropped}}, state) do
new_committed_filters = MapSet.delete(state.committed_relation_filters, oid)

{:noreply, %{state | committed_relation_filters: new_committed_filters},
state.publication_refresh_period}
state =
clear_in_flight_if_committed(%{state | committed_relation_filters: new_committed_filters})

{:noreply, state, state.publication_refresh_period}
end

def handle_cast({:relation_configuration_result, {oid, _} = oid_rel, {:ok, :configured}}, state) do
state = reply_to_relation_waiters(oid_rel, :ok, state)
new_committed_filters = MapSet.put(state.committed_relation_filters, oid)

{:noreply, %{state | committed_relation_filters: new_committed_filters},
state.publication_refresh_period}
state =
clear_in_flight_if_committed(%{state | committed_relation_filters: new_committed_filters})

{:noreply, state, state.publication_refresh_period}
end

def handle_cast({:relation_configuration_result, oid_rel, {:error, error}}, state) do
Expand All @@ -296,11 +307,12 @@ defmodule Electric.Replication.PublicationManager.RelationTracker do
)
end

{:noreply, state, state.publication_refresh_period}
{:noreply, %{state | in_flight_relation_filters: nil}, state.publication_refresh_period}
end

def handle_cast({:configuration_error, {:error, error}}, state) do
{:noreply, reply_to_all_waiters({:error, error}, state), state.publication_refresh_period}
state = %{reply_to_all_waiters({:error, error}, state) | in_flight_relation_filters: nil}
{:noreply, state, state.publication_refresh_period}
end

@impl true
Expand Down Expand Up @@ -328,7 +340,11 @@ defmodule Electric.Replication.PublicationManager.RelationTracker do
expand_oids(state.prepared_relation_filters, state)
)

%{state | submitted_relation_filters: state.prepared_relation_filters}
%{
state
| submitted_relation_filters: state.prepared_relation_filters,
in_flight_relation_filters: state.prepared_relation_filters
}
end

@spec expand_oids(MapSet.t(Electric.relation_id()), state()) ::
Expand All @@ -345,9 +361,34 @@ defmodule Electric.Replication.PublicationManager.RelationTracker do
defp update_needed?(%__MODULE__{
prepared_relation_filters: prepared,
submitted_relation_filters: submitted,
committed_relation_filters: committed
committed_relation_filters: committed,
in_flight_relation_filters: in_flight
}) do
not MapSet.equal?(prepared, submitted) or not MapSet.equal?(submitted, committed)
prepared_changed? = not MapSet.equal?(prepared, submitted)

# The committed-lag check is the legitimate "previous attempt didn't fully
# commit, retry" signal, but it must not fire while a submission for the
# same filters is already in flight — otherwise every add_shape/remove_shape
# during that window mints a redundant cast. The publication_refresh_period
# inactivity timeout is the fallback retry if a chain dies without delivering
# any result.
retry_needed? = is_nil(in_flight) and not MapSet.equal?(submitted, committed)

prepared_changed? or retry_needed?
end

@spec clear_in_flight_if_committed(state()) :: state()
defp clear_in_flight_if_committed(
%__MODULE__{
committed_relation_filters: committed,
in_flight_relation_filters: in_flight
} = state
) do
if not is_nil(in_flight) and MapSet.equal?(committed, in_flight) do
%{state | in_flight_relation_filters: nil}
else
state
end
end

@spec add_shape_to_publication_filters(shape_handle(), publication_filter(), state()) :: state()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,144 @@ defmodule Electric.Replication.PublicationManagerTest do
end
end

describe "cast issuance" do
@tag update_debounce_timeout: 0
test "issues a single configure cast while a submission is in flight", ctx do
test_pid = self()

# Capture casts to the Configurator without forwarding them, so the
# submission never produces a result and `committed` stays behind
# `submitted` for the whole test — exactly the in-flight window.
Repatch.patch(
PublicationManager.Configurator,
:configure_publication,
[mode: :shared],
fn _stack_id, filters -> send(test_pid, {:configure_cast, filters}) end
)

# Three shapes on the SAME relation (same oid) → a single relation
# transition. Run them async because, with no result delivered, each
# add_shape blocks as a waiter.
for {handle, where} <- [
{@shape_handle_1, @where_clause_1},
{@shape_handle_2, @where_clause_2},
{@shape_handle_3, @where_clause_3}
] do
shape = generate_shape(ctx.relation_with_oid, where)
run_async(fn -> PublicationManager.add_shape(ctx.stack_id, handle, shape) end)
end

# Exactly one cast, carrying the single-relation filter set.
assert_receive {:configure_cast, filters}
assert MapSet.size(filters) == 1
refute_receive {:configure_cast, _}, 200
end

@tag update_debounce_timeout: 0
test "re-issues a configure cast after a global configuration error", ctx do
test_pid = self()

Repatch.patch(
PublicationManager.Configurator,
:configure_publication,
[mode: :shared],
fn _stack_id, _filters -> send(test_pid, :configure_cast) end
)

relation_tracker = PublicationManager.RelationTracker.name(ctx.stack_id)

shape1 = generate_shape(ctx.relation_with_oid, @where_clause_1)
run_async(fn -> PublicationManager.add_shape(ctx.stack_id, @shape_handle_1, shape1) end)

# First submission is now in flight.
assert_receive :configure_cast

# Simulate the in-flight chain dying with a global configuration error.
GenServer.cast(
relation_tracker,
{:configuration_error, {:error, %RuntimeError{message: "boom"}}}
)

# A subsequent add on the SAME relation (no change to prepared filters)
# must re-arm the retry path and issue a fresh cast.
shape2 = generate_shape(ctx.relation_with_oid, @where_clause_2)
run_async(fn -> PublicationManager.add_shape(ctx.stack_id, @shape_handle_2, shape2) end)

assert_receive :configure_cast, 500
end

@tag update_debounce_timeout: 0
test "still issues a cast for a new relation while another submission is in flight", ctx do
Postgrex.query!(
ctx.pool,
"CREATE TABLE other_table (id UUID PRIMARY KEY, value TEXT NOT NULL)",
[]
)

alt_relation = {"public", "other_table"}
alt_relation_oid = lookup_relation_oid(ctx.pool, alt_relation)

test_pid = self()

Repatch.patch(
PublicationManager.Configurator,
:configure_publication,
[mode: :shared],
fn _stack_id, filters -> send(test_pid, {:configure_cast, filters}) end
)

shape1 = generate_shape(ctx.relation_with_oid, @where_clause_1)
run_async(fn -> PublicationManager.add_shape(ctx.stack_id, @shape_handle_1, shape1) end)

# First relation's submission is in flight (no result delivered).
assert_receive {:configure_cast, first_filters}
assert MapSet.size(first_filters) == 1

# Adding a DIFFERENT relation changes `prepared`, so a fresh cast must
# still be issued despite the in-flight submission.
shape2 = generate_shape({alt_relation_oid, alt_relation}, @where_clause_1)
run_async(fn -> PublicationManager.add_shape(ctx.stack_id, @shape_handle_2, shape2) end)

assert_receive {:configure_cast, second_filters}, 500
assert MapSet.size(second_filters) == 2
end

@tag update_debounce_timeout: 0
test "re-issues a configure cast after a per-relation configuration error", ctx do
test_pid = self()

Repatch.patch(
PublicationManager.Configurator,
:configure_publication,
[mode: :shared],
fn _stack_id, _filters -> send(test_pid, :configure_cast) end
)

relation_tracker = PublicationManager.RelationTracker.name(ctx.stack_id)
oid_rel = ctx.relation_with_oid

shape1 = generate_shape(ctx.relation_with_oid, @where_clause_1)
run_async(fn -> PublicationManager.add_shape(ctx.stack_id, @shape_handle_1, shape1) end)

# First submission is now in flight.
assert_receive :configure_cast

# Simulate the in-flight chain reporting a per-relation error for the
# relation it was configuring.
GenServer.cast(
relation_tracker,
{:relation_configuration_result, oid_rel, {:error, %RuntimeError{message: "boom"}}}
)

# A subsequent add on the SAME relation (no change to prepared filters)
# must re-arm the retry path and issue a fresh cast.
shape2 = generate_shape(ctx.relation_with_oid, @where_clause_2)
run_async(fn -> PublicationManager.add_shape(ctx.stack_id, @shape_handle_2, shape2) end)

assert_receive :configure_cast, 500
end
end

describe "remove_shape/2" do
test "removes single relation when last shape removed", ctx do
shape = generate_shape(ctx.relation_with_oid, @where_clause_1)
Expand Down
Loading