From 8e7a30aa83e2791a938576348f002c03b4f7b429 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 9 Jun 2026 14:20:06 +0200 Subject: [PATCH 1/3] fix(sync-service): suppress redundant publication-config casts while a submission is in flight (#4396) Co-Authored-By: Claude Opus 4.8 (1M context) --- .../publication_manager/relation_tracker.ex | 54 ++++++++++++++++--- .../replication/publication_manager_test.exs | 34 ++++++++++++ 2 files changed, 81 insertions(+), 7 deletions(-) diff --git a/packages/sync-service/lib/electric/replication/publication_manager/relation_tracker.ex b/packages/sync-service/lib/electric/replication/publication_manager/relation_tracker.ex index 8c1d41e07f..4114132d92 100644 --- a/packages/sync-service/lib/electric/replication/publication_manager/relation_tracker.ex +++ b/packages/sync-service/lib/electric/replication/publication_manager/relation_tracker.ex @@ -29,6 +29,12 @@ defmodule Electric.Replication.PublicationManager.RelationTracker do prepared_relation_filters: MapSet.new(), submitted_relation_filters: MapSet.new(), committed_relation_filters: MapSet.new(), + # Snapshot of the filter set carried by the cast currently being processed + # by the Configurator. While set, the "committed lags submitted" retry in + # update_needed?/1 is suppressed so a burst of add_shape/remove_shape calls + # cannot mint one redundant cast each. Cleared on a terminal signal from the + # Configurator (full commit, per-relation error, or global error). + in_flight_relation_filters: nil, waiters: %{}, # start with optimistic assumption about what the # publication supports (altering and generated columns) @@ -48,6 +54,7 @@ defmodule Electric.Replication.PublicationManager.RelationTracker do prepared_relation_filters: internal_relation_filters(), submitted_relation_filters: internal_relation_filters(), committed_relation_filters: internal_relation_filters(), + in_flight_relation_filters: internal_relation_filters() | nil, waiters: %{Electric.relation_id() => [waiter(), ...]}, publication_name: String.t(), publishes_generated_columns?: boolean(), @@ -265,16 +272,20 @@ defmodule Electric.Replication.PublicationManager.RelationTracker do def handle_cast({:relation_configuration_result, {oid, _rel}, {:ok, :dropped}}, state) do new_committed_filters = MapSet.delete(state.committed_relation_filters, oid) - {:noreply, %{state | committed_relation_filters: new_committed_filters}, - state.publication_refresh_period} + state = + clear_in_flight_if_committed(%{state | committed_relation_filters: new_committed_filters}) + + {:noreply, state, state.publication_refresh_period} end def handle_cast({:relation_configuration_result, {oid, _} = oid_rel, {:ok, :configured}}, state) do state = reply_to_relation_waiters(oid_rel, :ok, state) new_committed_filters = MapSet.put(state.committed_relation_filters, oid) - {:noreply, %{state | committed_relation_filters: new_committed_filters}, - state.publication_refresh_period} + state = + clear_in_flight_if_committed(%{state | committed_relation_filters: new_committed_filters}) + + {:noreply, state, state.publication_refresh_period} end def handle_cast({:relation_configuration_result, oid_rel, {:error, error}}, state) do @@ -328,7 +339,11 @@ defmodule Electric.Replication.PublicationManager.RelationTracker do expand_oids(state.prepared_relation_filters, state) ) - %{state | submitted_relation_filters: state.prepared_relation_filters} + %{ + state + | submitted_relation_filters: state.prepared_relation_filters, + in_flight_relation_filters: state.prepared_relation_filters + } end @spec expand_oids(MapSet.t(Electric.relation_id()), state()) :: @@ -345,9 +360,34 @@ defmodule Electric.Replication.PublicationManager.RelationTracker do defp update_needed?(%__MODULE__{ prepared_relation_filters: prepared, submitted_relation_filters: submitted, - committed_relation_filters: committed + committed_relation_filters: committed, + in_flight_relation_filters: in_flight }) do - not MapSet.equal?(prepared, submitted) or not MapSet.equal?(submitted, committed) + prepared_changed? = not MapSet.equal?(prepared, submitted) + + # The committed-lag check is the legitimate "previous attempt didn't fully + # commit, retry" signal, but it must not fire while a submission for the + # same filters is already in flight — otherwise every add_shape/remove_shape + # during that window mints a redundant cast. The publication_refresh_period + # inactivity timeout is the fallback retry if a chain dies without delivering + # any result. + retry_needed? = is_nil(in_flight) and not MapSet.equal?(submitted, committed) + + prepared_changed? or retry_needed? + end + + @spec clear_in_flight_if_committed(state()) :: state() + defp clear_in_flight_if_committed( + %__MODULE__{ + committed_relation_filters: committed, + in_flight_relation_filters: in_flight + } = state + ) do + if not is_nil(in_flight) and MapSet.equal?(committed, in_flight) do + %{state | in_flight_relation_filters: nil} + else + state + end end @spec add_shape_to_publication_filters(shape_handle(), publication_filter(), state()) :: state() diff --git a/packages/sync-service/test/electric/replication/publication_manager_test.exs b/packages/sync-service/test/electric/replication/publication_manager_test.exs index 6158fe1cb0..1b21a24f4c 100644 --- a/packages/sync-service/test/electric/replication/publication_manager_test.exs +++ b/packages/sync-service/test/electric/replication/publication_manager_test.exs @@ -233,6 +233,40 @@ defmodule Electric.Replication.PublicationManagerTest do end end + describe "cast issuance" do + @tag update_debounce_timeout: 0 + test "issues a single configure cast while a submission is in flight", ctx do + test_pid = self() + + # Capture casts to the Configurator without forwarding them, so the + # submission never produces a result and `committed` stays behind + # `submitted` for the whole test — exactly the in-flight window. + Repatch.patch( + PublicationManager.Configurator, + :configure_publication, + [mode: :shared], + fn _stack_id, filters -> send(test_pid, {:configure_cast, filters}) end + ) + + # Three shapes on the SAME relation (same oid) → a single relation + # transition. Run them async because, with no result delivered, each + # add_shape blocks as a waiter. + for {handle, where} <- [ + {@shape_handle_1, @where_clause_1}, + {@shape_handle_2, @where_clause_2}, + {@shape_handle_3, @where_clause_3} + ] do + shape = generate_shape(ctx.relation_with_oid, where) + run_async(fn -> PublicationManager.add_shape(ctx.stack_id, handle, shape) end) + end + + # Exactly one cast, carrying the single-relation filter set. + assert_receive {:configure_cast, filters} + assert MapSet.size(filters) == 1 + refute_receive {:configure_cast, _}, 200 + end + end + describe "remove_shape/2" do test "removes single relation when last shape removed", ctx do shape = generate_shape(ctx.relation_with_oid, @where_clause_1) From 66e6cb9399d4f0d1ed4268946830308051d2604f Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 9 Jun 2026 14:26:25 +0200 Subject: [PATCH 2/3] fix(sync-service): re-arm publication-config retries after terminal errors (#4396) Co-Authored-By: Claude Opus 4.8 (1M context) --- .../publication_manager/relation_tracker.ex | 5 +- .../replication/publication_manager_test.exs | 69 +++++++++++++++++++ 2 files changed, 72 insertions(+), 2 deletions(-) diff --git a/packages/sync-service/lib/electric/replication/publication_manager/relation_tracker.ex b/packages/sync-service/lib/electric/replication/publication_manager/relation_tracker.ex index 4114132d92..4dde321c14 100644 --- a/packages/sync-service/lib/electric/replication/publication_manager/relation_tracker.ex +++ b/packages/sync-service/lib/electric/replication/publication_manager/relation_tracker.ex @@ -307,11 +307,12 @@ defmodule Electric.Replication.PublicationManager.RelationTracker do ) end - {:noreply, state, state.publication_refresh_period} + {:noreply, %{state | in_flight_relation_filters: nil}, state.publication_refresh_period} end def handle_cast({:configuration_error, {:error, error}}, state) do - {:noreply, reply_to_all_waiters({:error, error}, state), state.publication_refresh_period} + state = %{reply_to_all_waiters({:error, error}, state) | in_flight_relation_filters: nil} + {:noreply, state, state.publication_refresh_period} end @impl true diff --git a/packages/sync-service/test/electric/replication/publication_manager_test.exs b/packages/sync-service/test/electric/replication/publication_manager_test.exs index 1b21a24f4c..0fc4ed0884 100644 --- a/packages/sync-service/test/electric/replication/publication_manager_test.exs +++ b/packages/sync-service/test/electric/replication/publication_manager_test.exs @@ -265,6 +265,75 @@ defmodule Electric.Replication.PublicationManagerTest do assert MapSet.size(filters) == 1 refute_receive {:configure_cast, _}, 200 end + + @tag update_debounce_timeout: 0 + test "re-issues a configure cast after a global configuration error", ctx do + test_pid = self() + + Repatch.patch( + PublicationManager.Configurator, + :configure_publication, + [mode: :shared], + fn _stack_id, _filters -> send(test_pid, :configure_cast) end + ) + + relation_tracker = PublicationManager.RelationTracker.name(ctx.stack_id) + + shape1 = generate_shape(ctx.relation_with_oid, @where_clause_1) + run_async(fn -> PublicationManager.add_shape(ctx.stack_id, @shape_handle_1, shape1) end) + + # First submission is now in flight. + assert_receive :configure_cast + + # Simulate the in-flight chain dying with a global configuration error. + GenServer.cast( + relation_tracker, + {:configuration_error, {:error, %RuntimeError{message: "boom"}}} + ) + + # A subsequent add on the SAME relation (no change to prepared filters) + # must re-arm the retry path and issue a fresh cast. + shape2 = generate_shape(ctx.relation_with_oid, @where_clause_2) + run_async(fn -> PublicationManager.add_shape(ctx.stack_id, @shape_handle_2, shape2) end) + + assert_receive :configure_cast, 500 + end + + @tag update_debounce_timeout: 0 + test "still issues a cast for a new relation while another submission is in flight", ctx do + Postgrex.query!( + ctx.pool, + "CREATE TABLE other_table (id UUID PRIMARY KEY, value TEXT NOT NULL)", + [] + ) + + alt_relation = {"public", "other_table"} + alt_relation_oid = lookup_relation_oid(ctx.pool, alt_relation) + + test_pid = self() + + Repatch.patch( + PublicationManager.Configurator, + :configure_publication, + [mode: :shared], + fn _stack_id, filters -> send(test_pid, {:configure_cast, filters}) end + ) + + shape1 = generate_shape(ctx.relation_with_oid, @where_clause_1) + run_async(fn -> PublicationManager.add_shape(ctx.stack_id, @shape_handle_1, shape1) end) + + # First relation's submission is in flight (no result delivered). + assert_receive {:configure_cast, first_filters} + assert MapSet.size(first_filters) == 1 + + # Adding a DIFFERENT relation changes `prepared`, so a fresh cast must + # still be issued despite the in-flight submission. + shape2 = generate_shape({alt_relation_oid, alt_relation}, @where_clause_1) + run_async(fn -> PublicationManager.add_shape(ctx.stack_id, @shape_handle_2, shape2) end) + + assert_receive {:configure_cast, second_filters}, 500 + assert MapSet.size(second_filters) == 2 + end end describe "remove_shape/2" do From 924522f766fb8e222cfcd1f1bd78c094577061a7 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 9 Jun 2026 14:34:01 +0200 Subject: [PATCH 3/3] chore(sync-service): changeset + per-relation-error re-arm test for publication-config cast suppression (#4396) Co-Authored-By: Claude Opus 4.8 (1M context) --- .../suppress-publication-config-casts.md | 5 +++ .../replication/publication_manager_test.exs | 35 +++++++++++++++++++ 2 files changed, 40 insertions(+) create mode 100644 .changeset/suppress-publication-config-casts.md diff --git a/.changeset/suppress-publication-config-casts.md b/.changeset/suppress-publication-config-casts.md new file mode 100644 index 0000000000..b9d27f126d --- /dev/null +++ b/.changeset/suppress-publication-config-casts.md @@ -0,0 +1,5 @@ +--- +'@core/sync-service': patch +--- + +Suppress redundant publication-configuration casts while a submission is already in flight. Under a burst of shape arrivals the publication manager's RelationTracker no longer mints one Configurator cast per add_shape/remove_shape, preventing the `publication_manager_configurator` mailbox from growing unboundedly (issue #4396). diff --git a/packages/sync-service/test/electric/replication/publication_manager_test.exs b/packages/sync-service/test/electric/replication/publication_manager_test.exs index 0fc4ed0884..097604f609 100644 --- a/packages/sync-service/test/electric/replication/publication_manager_test.exs +++ b/packages/sync-service/test/electric/replication/publication_manager_test.exs @@ -334,6 +334,41 @@ defmodule Electric.Replication.PublicationManagerTest do assert_receive {:configure_cast, second_filters}, 500 assert MapSet.size(second_filters) == 2 end + + @tag update_debounce_timeout: 0 + test "re-issues a configure cast after a per-relation configuration error", ctx do + test_pid = self() + + Repatch.patch( + PublicationManager.Configurator, + :configure_publication, + [mode: :shared], + fn _stack_id, _filters -> send(test_pid, :configure_cast) end + ) + + relation_tracker = PublicationManager.RelationTracker.name(ctx.stack_id) + oid_rel = ctx.relation_with_oid + + shape1 = generate_shape(ctx.relation_with_oid, @where_clause_1) + run_async(fn -> PublicationManager.add_shape(ctx.stack_id, @shape_handle_1, shape1) end) + + # First submission is now in flight. + assert_receive :configure_cast + + # Simulate the in-flight chain reporting a per-relation error for the + # relation it was configuring. + GenServer.cast( + relation_tracker, + {:relation_configuration_result, oid_rel, {:error, %RuntimeError{message: "boom"}}} + ) + + # A subsequent add on the SAME relation (no change to prepared filters) + # must re-arm the retry path and issue a fresh cast. + shape2 = generate_shape(ctx.relation_with_oid, @where_clause_2) + run_async(fn -> PublicationManager.add_shape(ctx.stack_id, @shape_handle_2, shape2) end) + + assert_receive :configure_cast, 500 + end end describe "remove_shape/2" do