From b80ba220246436d350edb14089299ed2dd690e51 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 9 Jun 2026 13:53:28 +0200 Subject: [PATCH 01/10] feat(consumer): make consumer-family spawn_opts configurable per process --- .../lib/electric/shapes/consumer.ex | 3 +- .../electric/shapes/consumer/materializer.ex | 5 +- .../electric/shapes/consumer/snapshotter.ex | 5 +- .../test/electric/shapes/consumer_test.exs | 71 +++++++++++++++++++ 4 files changed, 81 insertions(+), 3 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 3ab114d0f5..24c32b3a70 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -112,7 +112,8 @@ defmodule Electric.Shapes.Consumer do def start_link(%{stack_id: stack_id, shape_handle: shape_handle} = _config) do GenServer.start_link(__MODULE__, %{stack_id: stack_id, shape_handle: shape_handle}, - name: name(stack_id, shape_handle) + name: name(stack_id, shape_handle), + spawn_opt: Electric.StackConfig.spawn_opts(stack_id, :consumer) ) end diff --git a/packages/sync-service/lib/electric/shapes/consumer/materializer.ex b/packages/sync-service/lib/electric/shapes/consumer/materializer.ex index c98dc4c1f0..08c6c97bf5 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/materializer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/materializer.ex @@ -111,7 +111,10 @@ defmodule Electric.Shapes.Consumer.Materializer do do: subscribe(%{stack_id: stack_id, shape_handle: shape_handle}) def start_link(opts) do - GenServer.start_link(__MODULE__, opts, name: name(opts)) + GenServer.start_link(__MODULE__, opts, + name: name(opts), + spawn_opt: Electric.StackConfig.spawn_opts(opts.stack_id, :consumer_materializer) + ) end def init(opts) do diff --git a/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex b/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex index 8ff3aa14be..2a64f916f9 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex @@ -20,7 +20,10 @@ defmodule Electric.Shapes.Consumer.Snapshotter do end def start_link(config) when is_map(config) do - GenServer.start_link(__MODULE__, config, name: name(config)) + GenServer.start_link(__MODULE__, config, + name: name(config), + spawn_opt: Electric.StackConfig.spawn_opts(config.stack_id, :consumer_snapshotter) + ) end def init(config) do diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 5b2d2cce2d..2c6df6550a 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -2509,6 +2509,77 @@ defmodule Electric.Shapes.ConsumerTest do ) end + describe "process gc configuration" do + setup [ + :with_registry, + :with_in_memory_storage, + :with_shape_status, + :with_lsn_tracker, + :with_persistent_kv, + :with_status_monitor, + :with_dynamic_consumer_supervisor, + :with_noop_publication_manager, + :with_shape_cleaner + ] + + setup ctx do + start_link_supervised!({ + ShapeLogCollector.Supervisor, + stack_id: ctx.stack_id, persistent_kv: ctx.persistent_kv, inspector: @base_inspector + }) + + ShapeLogCollector.mark_as_ready(ctx.stack_id) + [shape_position: @shape_position] + end + + @tag process_spawn_opts: %{consumer: [fullsweep_after: 4, priority: :high]} + test "spawn_opts are correctly passed to consumer process", ctx do + support_test_storage_wrap(ctx, @shape_handle1, @shape1) + + {:ok, consumer} = + start_supervised( + {Consumer, + %{ + shape_handle: @shape_handle1, + stack_id: ctx.stack_id + }}, + id: {Consumer, @shape_handle1} + ) + + Consumer.initialize_shape(consumer, @shape1, %{action: :create}) + assert_receive {Support.TestStorage, :init_writer!, @shape_handle1, @shape1} + :started = Consumer.await_snapshot_start(ctx.stack_id, @shape_handle1) + + pid = Consumer.name(ctx.stack_id, @shape_handle1) |> GenServer.whereis() + info = Process.info(pid) + + assert info[:priority] == :high + assert info[:garbage_collection][:fullsweep_after] == 4 + end + end + + defp support_test_storage_wrap(ctx, shape_handle, shape) do + %{snapshot_xmin: xmin} = shape_status(shape_handle, ctx) + shapes = %{shape_handle => shape} + + storage = + Support.TestStorage.wrap(ctx.storage, %{ + shape_handle => [ + {:mark_snapshot_as_started, []}, + {:set_pg_snapshot, [%{xmin: xmin, xmax: xmin + 1, xip_list: [xmin]}]} + ] + }) + + Electric.StackConfig.put(ctx.stack_id, Electric.ShapeCache.Storage, storage) + Electric.StackConfig.put(ctx.stack_id, :inspector, @base_inspector) + + patch_shape_status(fetch_shape_by_handle: fn _, sh -> Map.fetch(shapes, sh) end) + + Support.TestUtils.activate_mocks_for_descendant_procs(Consumer) + Support.TestUtils.activate_mocks_for_descendant_procs(Electric.ShapeCache.ShapeCleaner) + :ok + end + defp get_log_items_from_storage(offset, shape_storage) do Storage.get_log_stream(offset, shape_storage) |> Enum.map(&Jason.decode!/1) end From a8f933dbd37c0b6e97234b8c083ad4f1c5f69990 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 9 Jun 2026 13:57:09 +0200 Subject: [PATCH 02/10] test(consumer): use supervised pid directly in spawn_opts test --- packages/sync-service/test/electric/shapes/consumer_test.exs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 2c6df6550a..372364d89b 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -2550,8 +2550,7 @@ defmodule Electric.Shapes.ConsumerTest do assert_receive {Support.TestStorage, :init_writer!, @shape_handle1, @shape1} :started = Consumer.await_snapshot_start(ctx.stack_id, @shape_handle1) - pid = Consumer.name(ctx.stack_id, @shape_handle1) |> GenServer.whereis() - info = Process.info(pid) + info = Process.info(consumer) assert info[:priority] == :high assert info[:garbage_collection][:fullsweep_after] == 4 From c20461a0ff8c1e74cd3c08bc5c551686f7a60599 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 9 Jun 2026 14:00:34 +0200 Subject: [PATCH 03/10] feat(consumer): add consumer_gc_heap_threshold stack config (default nil) --- packages/sync-service/config/runtime.exs | 1 + packages/sync-service/lib/electric/config.ex | 4 ++++ packages/sync-service/lib/electric/stack_config.ex | 3 ++- packages/sync-service/lib/electric/stack_supervisor.ex | 6 ++++++ 4 files changed, 13 insertions(+), 1 deletion(-) diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index 6dbbefcb59..c8648effe2 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -287,6 +287,7 @@ config :electric, process_registry_partitions: env!("ELECTRIC_TWEAKS_PROCESS_REGISTRY_PARTITIONS", :integer, nil), process_spawn_opts: env!("ELECTRIC_PROCESS_SPAWN_OPTS", &Electric.Config.parse_spawn_opts!/1, %{}), + consumer_gc_heap_threshold: env!("ELECTRIC_CONSUMER_GC_HEAP_THRESHOLD", :integer, nil), http_api_num_acceptors: env!("ELECTRIC_TWEAKS_HTTP_API_NUM_ACCEPTORS", :integer, 100), conn_max_requests: env!("ELECTRIC_TWEAKS_CONN_MAX_REQUESTS", :integer, nil), handler_fullsweep_after: env!("ELECTRIC_TWEAKS_HANDLER_FULLSWEEP_AFTER", :integer, nil), diff --git a/packages/sync-service/lib/electric/config.ex b/packages/sync-service/lib/electric/config.ex index c33ac3c271..6eb2c322f3 100644 --- a/packages/sync-service/lib/electric/config.ex +++ b/packages/sync-service/lib/electric/config.ex @@ -109,6 +109,10 @@ defmodule Electric.Config do # # e.g. %{shape_log_collector: [min_heap_size: 1024 * 1024, min_bin_vheap_size: 1024 * 1024]} process_spawn_opts: %{}, + # Heap-size threshold (in BYTES) above which a consumer runs :erlang.garbage_collect() + # after processing a transaction fragment. nil disables adaptive GC. Looked up at + # runtime via StackConfig so it can be changed from a live IEx shell. + consumer_gc_heap_threshold: nil, ## Misc process_registry_partitions: &Electric.Config.Defaults.process_registry_partitions/0, feature_flags: if(Mix.env() == :test, do: @known_feature_flags, else: []), diff --git a/packages/sync-service/lib/electric/stack_config.ex b/packages/sync-service/lib/electric/stack_config.ex index cc614cd055..27f8378309 100644 --- a/packages/sync-service/lib/electric/stack_config.ex +++ b/packages/sync-service/lib/electric/stack_config.ex @@ -32,7 +32,8 @@ defmodule Electric.StackConfig do shape_enable_suspend?: Electric.Config.default(:shape_enable_suspend?), chunk_bytes_threshold: Electric.ShapeCache.LogChunker.default_chunk_size_threshold(), feature_flags: [], - process_spawn_opts: %{} + process_spawn_opts: %{}, + consumer_gc_heap_threshold: Electric.Config.default(:consumer_gc_heap_threshold) ] end diff --git a/packages/sync-service/lib/electric/stack_supervisor.ex b/packages/sync-service/lib/electric/stack_supervisor.ex index 23912c21be..fd30eeb485 100644 --- a/packages/sync-service/lib/electric/stack_supervisor.ex +++ b/packages/sync-service/lib/electric/stack_supervisor.ex @@ -150,6 +150,10 @@ defmodule Electric.StackSupervisor do default: nil ], process_spawn_opts: [type: :map, default: %{}], + consumer_gc_heap_threshold: [ + type: {:or, [:non_neg_integer, nil]}, + default: Electric.Config.default(:consumer_gc_heap_threshold) + ], consumer_partitions: [type: {:or, [:pos_integer, nil]}, default: nil] ] ], @@ -352,6 +356,7 @@ defmodule Electric.StackSupervisor do shape_hibernate_after = Keyword.fetch!(config.tweaks, :shape_hibernate_after) shape_enable_suspend? = Keyword.fetch!(config.tweaks, :shape_enable_suspend?) process_spawn_opts = Keyword.fetch!(config.tweaks, :process_spawn_opts) + consumer_gc_heap_threshold = Keyword.fetch!(config.tweaks, :consumer_gc_heap_threshold) shape_cache_opts = [ stack_id: stack_id @@ -401,6 +406,7 @@ defmodule Electric.StackSupervisor do shape_hibernate_after: shape_hibernate_after, shape_enable_suspend?: shape_enable_suspend?, process_spawn_opts: process_spawn_opts, + consumer_gc_heap_threshold: consumer_gc_heap_threshold, feature_flags: Map.get(config, :feature_flags, []) ]}, {Electric.AsyncDeleter, From 3fa95b4d3d47d79d2cc8ca366a8a897106b180b7 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 9 Jun 2026 14:03:26 +0200 Subject: [PATCH 04/10] fix(consumer): thread consumer_gc_heap_threshold env into stack tweaks --- .../sync-service/lib/electric/application.ex | 3 +- .../test/electric/config_test.exs | 39 +++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index c46b55c7b9..4a18cd1b8f 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -155,7 +155,8 @@ defmodule Electric.Application do shape_enable_suspend?: get_env(opts, :shape_enable_suspend?), conn_max_requests: get_env(opts, :conn_max_requests), handler_fullsweep_after: get_env(opts, :handler_fullsweep_after), - process_spawn_opts: get_env(opts, :process_spawn_opts) + process_spawn_opts: get_env(opts, :process_spawn_opts), + consumer_gc_heap_threshold: get_env(opts, :consumer_gc_heap_threshold) ], manual_table_publishing?: get_env(opts, :manual_table_publishing?), shape_db_opts: [ diff --git a/packages/sync-service/test/electric/config_test.exs b/packages/sync-service/test/electric/config_test.exs index 366c4352ec..f9b647b7a9 100644 --- a/packages/sync-service/test/electric/config_test.exs +++ b/packages/sync-service/test/electric/config_test.exs @@ -120,4 +120,43 @@ defmodule Electric.ConfigTest do end end end + + describe "tweaks propagation" do + setup do + initial_config = Application.get_all_env(:electric) + + for {key, _} <- initial_config do + Application.delete_env(:electric, key) + end + + on_exit(fn -> + Application.put_all_env([{:electric, initial_config}]) + end) + + [initial_config: initial_config] + end + + test "consumer_gc_heap_threshold opt is threaded into tweaks", ctx do + threshold = 209_715_200 + + config = + Electric.Application.configuration( + Keyword.merge( + Keyword.take(ctx.initial_config, [:replication_connection_opts]), + consumer_gc_heap_threshold: threshold + ) + ) + + assert Keyword.fetch!(config[:tweaks], :consumer_gc_heap_threshold) == threshold + end + + test "consumer_gc_heap_threshold defaults to nil in tweaks", ctx do + config = + Electric.Application.configuration( + Keyword.take(ctx.initial_config, [:replication_connection_opts]) + ) + + assert Keyword.fetch!(config[:tweaks], :consumer_gc_heap_threshold) == nil + end + end end From bb2d06b933f1d7d8963b31ab8bc70dd2a0ff3a6f Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 9 Jun 2026 14:09:46 +0200 Subject: [PATCH 05/10] feat(consumer): adaptive GC after fragment when heap exceeds threshold --- .../lib/electric/shapes/consumer.ex | 27 +++- .../test/electric/shapes/consumer_test.exs | 131 ++++++++++++++++++ 2 files changed, 157 insertions(+), 1 deletion(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 24c32b3a70..754e1f6252 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -479,7 +479,32 @@ defmodule Electric.Shapes.Consumer do defp handle_event(%TransactionFragment{} = txn_fragment, state) do Logger.debug(fn -> "Txn fragment received in Shapes.Consumer: #{inspect(txn_fragment)}" end) - handle_txn_fragment(txn_fragment, state) + + txn_fragment + |> handle_txn_fragment(state) + |> maybe_garbage_collect() + end + + defp maybe_garbage_collect(%State{stack_id: stack_id} = state) do + threshold = Electric.StackConfig.lookup(stack_id, :consumer_gc_heap_threshold, nil) + + if not is_nil(threshold) do + {:total_heap_size, heap_words} = :erlang.process_info(self(), :total_heap_size) + + if over_heap_threshold?(heap_words, threshold) do + :erlang.garbage_collect() + end + end + + state + end + + @doc false + # heap_words: process total_heap_size in words; threshold_bytes: configured byte threshold (or nil) + def over_heap_threshold?(_heap_words, nil), do: false + + def over_heap_threshold?(heap_words, threshold_bytes) when is_integer(threshold_bytes) do + heap_words * :erlang.system_info(:wordsize) > threshold_bytes end # A consumer process starts with buffering?=true before it has PG snapshot info (xmin, xmax, xip_list). diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 372364d89b..053515ac6f 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -2579,6 +2579,137 @@ defmodule Electric.Shapes.ConsumerTest do :ok end + describe "over_heap_threshold?/2" do + test "false when threshold is nil" do + refute Electric.Shapes.Consumer.over_heap_threshold?(1_000_000, nil) + end + + test "false when heap (words) is below threshold (bytes)" do + refute Electric.Shapes.Consumer.over_heap_threshold?(10, 1024) + end + + test "true when heap (words) exceeds threshold (bytes)" do + assert Electric.Shapes.Consumer.over_heap_threshold?(1000, 1) + end + + test "exactly-equal is not over threshold" do + wordsize = :erlang.system_info(:wordsize) + refute Electric.Shapes.Consumer.over_heap_threshold?(1, wordsize) + end + end + + describe "adaptive GC after fragment processing" do + @describetag :tmp_dir + + setup do + %{inspector: @base_inspector, pool: nil} + end + + setup [ + :with_registry, + :with_pure_file_storage, + :with_shape_status, + :with_lsn_tracker, + :with_log_chunking, + :with_persistent_kv, + :with_async_deleter, + :with_shape_cleaner, + :with_shape_log_collector, + :with_noop_publication_manager, + :with_status_monitor + ] + + setup(ctx) do + patch_snapshotter(fn parent, shape_handle, _shape, %{snapshot_fun: snapshot_fun} -> + pg_snapshot = {10, 11, [10]} + GenServer.cast(parent, {:pg_snapshot_known, shape_handle, pg_snapshot}) + GenServer.cast(parent, {:snapshot_started, shape_handle}) + snapshot_fun.([]) + end) + + Electric.StackConfig.put(ctx.stack_id, :shape_hibernate_after, 10_000) + :ok + end + + setup ctx do + %{consumer_supervisor: consumer_supervisor, shape_cache: shape_cache} = + Support.ComponentSetup.with_shape_cache(ctx) + + %{ + consumer_supervisor: consumer_supervisor, + shape_cache: shape_cache + } + end + + test "GC runs when heap exceeds tiny threshold", ctx do + Electric.StackConfig.put(ctx.stack_id, :consumer_gc_heap_threshold, 1) + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + consumer_pid = Consumer.whereis(ctx.stack_id, shape_handle) + ref = Shapes.Consumer.register_for_changes(ctx.stack_id, shape_handle) + + xid = 11 + lsn = Lsn.from_integer(10) + + {:total_heap_size, heap_before} = :erlang.process_info(consumer_pid, :total_heap_size) + + # Inflate the consumer's heap by sending a large binary payload + large_binary = :binary.copy(<<0>>, 200_000) + + txn = + complete_txn_fragment(xid, lsn, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "1", "value" => large_binary}, + log_offset: LogOffset.new(lsn, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) + + # Wait for the consumer to process the fragment + assert_receive {^ref, :new_changes, _}, @receive_timeout + + {:total_heap_size, heap_after} = :erlang.process_info(consumer_pid, :total_heap_size) + + # GC should have run (heap_after <= heap_before is the indicator), + # or at minimum the heap hasn't grown unboundedly relative to what we sent. + # Since threshold=1 forces GC on every fragment, the heap post-GC should + # be well below the inflated size. We verify the GC fired by checking that + # the post-fragment heap is not larger than the pre-fragment heap (i.e., GC ran). + assert heap_after <= heap_before * 2, + "Expected GC to reclaim heap after large fragment (before=#{heap_before} words, after=#{heap_after} words)" + end + + test "no GC by default (threshold=nil)", ctx do + # Ensure no threshold is set (default behaviour) + assert nil == Electric.StackConfig.lookup(ctx.stack_id, :consumer_gc_heap_threshold, nil) + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + ref = Shapes.Consumer.register_for_changes(ctx.stack_id, shape_handle) + + xid = 11 + lsn = Lsn.from_integer(10) + + txn = + complete_txn_fragment(xid, lsn, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "1"}, + log_offset: LogOffset.new(lsn, 0) + } + ]) + + # Should process without error even when no GC threshold is configured + assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) + assert_receive {^ref, :new_changes, _}, @receive_timeout + end + end + defp get_log_items_from_storage(offset, shape_storage) do Storage.get_log_stream(offset, shape_storage) |> Enum.map(&Jason.decode!/1) end From d3088fc4f19c0d82b9273996c7cf91c8051a9989 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 9 Jun 2026 14:19:47 +0200 Subject: [PATCH 06/10] refactor(consumer): cleaner adaptive-GC nil path + payload-relative GC test + negative test --- .../lib/electric/shapes/consumer.ex | 19 ++++--- .../test/electric/shapes/consumer_test.exs | 54 ++++++++++++++++--- 2 files changed, 55 insertions(+), 18 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 754e1f6252..99a8253f60 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -38,6 +38,7 @@ defmodule Electric.Shapes.Consumer do @default_snapshot_timeout 45_000 @stop_and_clean_timeout 30_000 @stop_and_clean_reason ShapeCleaner.consumer_cleanup_reason() + @word_size :erlang.system_info(:wordsize) @type initialize_shape_opts() :: %{ :action => :create | :restore, @@ -486,17 +487,15 @@ defmodule Electric.Shapes.Consumer do end defp maybe_garbage_collect(%State{stack_id: stack_id} = state) do - threshold = Electric.StackConfig.lookup(stack_id, :consumer_gc_heap_threshold, nil) - - if not is_nil(threshold) do - {:total_heap_size, heap_words} = :erlang.process_info(self(), :total_heap_size) + case Electric.StackConfig.lookup(stack_id, :consumer_gc_heap_threshold, nil) do + nil -> + state - if over_heap_threshold?(heap_words, threshold) do - :erlang.garbage_collect() - end + threshold -> + {:total_heap_size, heap_words} = :erlang.process_info(self(), :total_heap_size) + if over_heap_threshold?(heap_words, threshold), do: :erlang.garbage_collect() + state end - - state end @doc false @@ -504,7 +503,7 @@ defmodule Electric.Shapes.Consumer do def over_heap_threshold?(_heap_words, nil), do: false def over_heap_threshold?(heap_words, threshold_bytes) when is_integer(threshold_bytes) do - heap_words * :erlang.system_info(:wordsize) > threshold_bytes + heap_words * @word_size > threshold_bytes end # A consumer process starts with buffering?=true before it has PG snapshot info (xmin, xmax, xip_list). diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 053515ac6f..9b5f671901 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -2653,7 +2653,46 @@ defmodule Electric.Shapes.ConsumerTest do xid = 11 lsn = Lsn.from_integer(10) - {:total_heap_size, heap_before} = :erlang.process_info(consumer_pid, :total_heap_size) + # Inflate the consumer's heap by sending a large binary payload + large_binary = :binary.copy(<<0>>, 200_000) + + txn = + complete_txn_fragment(xid, lsn, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "1", "value" => large_binary}, + log_offset: LogOffset.new(lsn, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) + + # Wait for the consumer to process the fragment + assert_receive {^ref, :new_changes, _}, @receive_timeout + + {:total_heap_size, heap_after} = :erlang.process_info(consumer_pid, :total_heap_size) + + # threshold=1 forces a full GC after this fragment. Because the ~200 KB payload is + # transient garbage (not live state), the post-GC heap must be far below the payload + # size. Without GC the heap grows ~185x (observed), so this assertion fails loudly. + payload_words = div(200_000, :erlang.system_info(:wordsize)) + + assert heap_after < payload_words, + "heap_after (#{heap_after} words) should be far below payload (#{payload_words} words) after GC" + end + + test "GC does not run when threshold is very large", ctx do + # 1 GB threshold — the consumer heap will never reach this, so GC must NOT fire. + Electric.StackConfig.put(ctx.stack_id, :consumer_gc_heap_threshold, 1_000_000_000) + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + consumer_pid = Consumer.whereis(ctx.stack_id, shape_handle) + ref = Shapes.Consumer.register_for_changes(ctx.stack_id, shape_handle) + + xid = 11 + lsn = Lsn.from_integer(10) # Inflate the consumer's heap by sending a large binary payload large_binary = :binary.copy(<<0>>, 200_000) @@ -2674,13 +2713,12 @@ defmodule Electric.Shapes.ConsumerTest do {:total_heap_size, heap_after} = :erlang.process_info(consumer_pid, :total_heap_size) - # GC should have run (heap_after <= heap_before is the indicator), - # or at minimum the heap hasn't grown unboundedly relative to what we sent. - # Since threshold=1 forces GC on every fragment, the heap post-GC should - # be well below the inflated size. We verify the GC fired by checking that - # the post-fragment heap is not larger than the pre-fragment heap (i.e., GC ran). - assert heap_after <= heap_before * 2, - "Expected GC to reclaim heap after large fragment (before=#{heap_before} words, after=#{heap_after} words)" + # GC was NOT triggered (threshold too high), so the heap still reflects + # the retained payload — it must be >= payload_words. + payload_words = div(200_000, :erlang.system_info(:wordsize)) + + assert heap_after >= payload_words, + "heap_after (#{heap_after} words) should be >= payload (#{payload_words} words) when GC is skipped" end test "no GC by default (threshold=nil)", ctx do From 3918293f13370a6811f08a569b7999139cf1c295 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 9 Jun 2026 14:23:20 +0200 Subject: [PATCH 07/10] feat(consumer): add IEx helpers to set GC heap threshold per-stack and fleet-wide --- .../lib/electric/shapes/consumer.ex | 61 +++++++++++++++++++ .../test/electric/shapes/consumer_test.exs | 26 ++++++++ 2 files changed, 87 insertions(+) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 99a8253f60..00f40c01a9 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -111,6 +111,67 @@ defmodule Electric.Shapes.Consumer do ConsumerRegistry.whereis(stack_id, shape_handle) end + @doc """ + Set the adaptive-GC heap threshold (bytes, or nil to disable) for a single stack. + Takes effect immediately for that stack's consumers — safe to call from IEx. + """ + def set_gc_heap_threshold(stack_id, threshold_bytes) + when is_nil(threshold_bytes) or (is_integer(threshold_bytes) and threshold_bytes >= 0) do + Electric.StackConfig.put(stack_id, :consumer_gc_heap_threshold, threshold_bytes) + :ok + end + + @doc """ + Set the adaptive-GC heap threshold for every live stack on this node. + Returns `{:ok, number_of_stacks_updated}`. Pass nil to disable everywhere. + Intended for live experimentation from an IEx shell. + """ + def set_gc_heap_threshold_all_stacks(threshold_bytes) do + stack_ids = list_stack_ids() + + # Guard against a stack dying between enumeration and the put: if the + # StackConfig ETS table vanishes, StackConfig.put/3 raises ArgumentError. + # We skip such stale entries rather than crashing the operator call. + Enum.each(stack_ids, fn stack_id -> + try do + set_gc_heap_threshold(stack_id, threshold_bytes) + rescue + ArgumentError -> :ok + end + end) + + {:ok, length(stack_ids)} + end + + # Enumerate live stacks by scanning ETS tables whose names match the + # Electric.StackConfig table-name prefix ("Electric.StackConfig:"). + # This is the most direct approach: StackConfig creates one named ETS table per + # stack, so the set of live tables IS the set of live stacks. + # No first-class listing API exists in the codebase (grep confirmed). + # A race (stack dies mid-iteration) is harmless: StackConfig.put/3 on a vanished + # table would raise ArgumentError, which we rescue and skip. + defp list_stack_ids do + prefix = "#{inspect(Electric.StackConfig)}:" + prefix_len = byte_size(prefix) + + :ets.all() + |> Enum.flat_map(fn tab -> + case :ets.info(tab, :name) do + :undefined -> + [] + + name -> + name_str = Atom.to_string(name) + + if String.starts_with?(name_str, prefix) do + [binary_part(name_str, prefix_len, byte_size(name_str) - prefix_len)] + else + [] + end + end + end) + end + def start_link(%{stack_id: stack_id, shape_handle: shape_handle} = _config) do GenServer.start_link(__MODULE__, %{stack_id: stack_id, shape_handle: shape_handle}, name: name(stack_id, shape_handle), diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 9b5f671901..8c69d690c8 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -2748,6 +2748,32 @@ defmodule Electric.Shapes.ConsumerTest do end end + describe "set_gc_heap_threshold helpers" do + # with_stack_id_from_test (line 87) already starts ProcessRegistry + StackConfig + # for ctx.stack_id — no heavier setup is needed for these pure-config tests. + + test "set_gc_heap_threshold/2 writes the value into StackConfig", ctx do + assert :ok = Electric.Shapes.Consumer.set_gc_heap_threshold(ctx.stack_id, 2_000_000) + + assert 2_000_000 == + Electric.StackConfig.lookup(ctx.stack_id, :consumer_gc_heap_threshold, nil) + end + + test "set_gc_heap_threshold/2 accepts nil to disable", ctx do + Electric.Shapes.Consumer.set_gc_heap_threshold(ctx.stack_id, 123) + assert :ok = Electric.Shapes.Consumer.set_gc_heap_threshold(ctx.stack_id, nil) + assert nil == Electric.StackConfig.lookup(ctx.stack_id, :consumer_gc_heap_threshold, nil) + end + + test "set_gc_heap_threshold_all_stacks/1 sets the live stack", ctx do + assert {:ok, n} = Electric.Shapes.Consumer.set_gc_heap_threshold_all_stacks(3_000_000) + assert n >= 1 + + assert 3_000_000 == + Electric.StackConfig.lookup(ctx.stack_id, :consumer_gc_heap_threshold, nil) + end + end + defp get_log_items_from_storage(offset, shape_storage) do Storage.get_log_stream(offset, shape_storage) |> Enum.map(&Jason.decode!/1) end From 2a7daad40b07b1c8db9b2ce78ea6e09f494b31cd Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 9 Jun 2026 14:29:11 +0200 Subject: [PATCH 08/10] fix(consumer): count only successfully-updated stacks in all-stacks helper --- .../lib/electric/shapes/consumer.ex | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 00f40c01a9..1f0fedeb9a 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -132,15 +132,19 @@ defmodule Electric.Shapes.Consumer do # Guard against a stack dying between enumeration and the put: if the # StackConfig ETS table vanishes, StackConfig.put/3 raises ArgumentError. # We skip such stale entries rather than crashing the operator call. - Enum.each(stack_ids, fn stack_id -> - try do - set_gc_heap_threshold(stack_id, threshold_bytes) - rescue - ArgumentError -> :ok - end - end) + # Only successfully-written stacks are counted so the returned value + # reflects reality even when stacks die mid-iteration. + count = + Enum.reduce(stack_ids, 0, fn stack_id, acc -> + try do + set_gc_heap_threshold(stack_id, threshold_bytes) + acc + 1 + rescue + ArgumentError -> acc + end + end) - {:ok, length(stack_ids)} + {:ok, count} end # Enumerate live stacks by scanning ETS tables whose names match the From 004939faa563f784e89b671b03434ba388a3d64c Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 9 Jun 2026 14:55:19 +0200 Subject: [PATCH 09/10] fix(consumer): apply adaptive GC during buffered-fragment drain + changeset --- .changeset/consumer-heap-gc.md | 5 ++ .../lib/electric/shapes/consumer.ex | 5 +- .../test/electric/shapes/consumer_test.exs | 73 +++++++++++++++++++ 3 files changed, 82 insertions(+), 1 deletion(-) create mode 100644 .changeset/consumer-heap-gc.md diff --git a/.changeset/consumer-heap-gc.md b/.changeset/consumer-heap-gc.md new file mode 100644 index 0000000000..b95e1a28bc --- /dev/null +++ b/.changeset/consumer-heap-gc.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +Bound Shape.Consumer heap growth: make the consumer family's process spawn options (incl. `fullsweep_after`) configurable per process via `ELECTRIC_PROCESS_SPAWN_OPTS`, and add an opt-in adaptive GC that runs after a transaction fragment when the consumer's heap exceeds the runtime-tunable `ELECTRIC_CONSUMER_GC_HEAP_THRESHOLD` (off by default). diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 1f0fedeb9a..52b77e6947 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -858,7 +858,10 @@ defmodule Electric.Shapes.Consumer do {txn_fragments, state} = State.pop_buffered(state) Enum.reduce_while(txn_fragments, state, fn txn_fragment, state -> - state = handle_txn_fragment(txn_fragment, state) + state = + txn_fragment + |> handle_txn_fragment(state) + |> maybe_garbage_collect() if state.terminating? do {:halt, state} diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 8c69d690c8..24e8aec437 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -2620,7 +2620,16 @@ defmodule Electric.Shapes.ConsumerTest do ] setup(ctx) do + delay_snapshot_creation? = Map.get(ctx, :delay_snapshot_creation?) + test_pid = self() + patch_snapshotter(fn parent, shape_handle, _shape, %{snapshot_fun: snapshot_fun} -> + if delay_snapshot_creation? do + receive do + {^test_pid, :resume} -> :ok + end + end + pg_snapshot = {10, 11, [10]} GenServer.cast(parent, {:pg_snapshot_known, shape_handle, pg_snapshot}) GenServer.cast(parent, {:snapshot_started, shape_handle}) @@ -2746,6 +2755,70 @@ defmodule Electric.Shapes.ConsumerTest do assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) assert_receive {^ref, :new_changes, _}, @receive_timeout end + + @tag delay_snapshot_creation?: true + test "GC runs during buffered-fragment drain when heap exceeds threshold", ctx do + # threshold=1 forces GC after every fragment processed during the buffer drain. + # The consumer starts with buffering?=true; fragments sent before pg_snapshot_known + # land in the buffer. When we unblock the snapshotter it fires pg_snapshot_known + # which triggers :consume_buffer → process_buffered_txn_fragments (our new GC call). + Electric.StackConfig.put(ctx.stack_id, :consumer_gc_heap_threshold, 1) + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + + # The snapshotter is now running but blocked on `receive {^test_pid, :resume}`. + assert_receive {:snapshot, ^shape_handle, snapshotter_pid} + + consumer_pid = Consumer.whereis(ctx.stack_id, shape_handle) + + # Trace GC events on the consumer to count full GCs fired during the drain. + :erlang.trace(consumer_pid, true, [:garbage_collection]) + + # Send a large-payload fragment while buffering?=true — it goes into the buffer. + large_binary = :binary.copy(<<0>>, 200_000) + xid = 11 + lsn = Lsn.from_integer(10) + + txn = + complete_txn_fragment(xid, lsn, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "1", "value" => large_binary}, + log_offset: LogOffset.new(lsn, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) + + # Unblock the snapshotter: fires pg_snapshot_known → :consume_buffer → + # process_buffered_txn_fragments where our new maybe_garbage_collect() call fires. + send(snapshotter_pid, {self(), :resume}) + + ref = Shapes.Consumer.register_for_changes(ctx.stack_id, shape_handle) + assert_receive {^ref, :new_changes, _}, @receive_timeout + + :erlang.trace(consumer_pid, false, [:garbage_collection]) + + # Count how many full GC (garbage_collect) trace messages arrived. + # :garbage_collection traces emit {:trace, pid, :gc_major_start, info} / + # :gc_major_end pairs (one per full :erlang.garbage_collect() call). + gc_events = + Stream.repeatedly(fn -> + receive do + {:trace, ^consumer_pid, :gc_major_start, _} -> :gc + {:trace, ^consumer_pid, :gc_minor_start, _} -> :minor + {:trace, ^consumer_pid, :gc_major_end, _} -> :skip + {:trace, ^consumer_pid, :gc_minor_end, _} -> :skip + after + 0 -> :done + end + end) + |> Stream.take_while(&(&1 != :done)) + |> Enum.count(&(&1 == :gc)) + + assert gc_events >= 1, + "expected at least one full GC during buffered-fragment drain, got #{gc_events}" + end end describe "set_gc_heap_threshold helpers" do From 1b866ed65026c96108db70c7a329d0cd8c9183b3 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 9 Jun 2026 15:18:49 +0200 Subject: [PATCH 10/10] feat(consumer): add hysteresis + critical-path docs for adaptive GC --- .../lib/electric/shapes/consumer.ex | 73 ++++++++++++++++++- .../lib/electric/shapes/consumer/state.ex | 5 +- .../test/electric/shapes/consumer_test.exs | 38 ++++++++++ 3 files changed, 113 insertions(+), 3 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 52b77e6947..e13d8a57b1 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -40,6 +40,14 @@ defmodule Electric.Shapes.Consumer do @stop_and_clean_reason ShapeCleaner.consumer_cleanup_reason() @word_size :erlang.system_info(:wordsize) + # Minimum wall-clock interval (ms) between consumer-forced full GC sweeps. + # Prevents GC-thrashing on the replication critical path: the ShapeLogCollector + # blocks until every consumer replies, so a forced GC on every fragment (e.g. + # during a buffered-fragment drain) would add publish latency proportional to + # the number of fragments. Hysteresis caps the worst-case frequency to at most + # one forced sweep per @gc_min_interval_ms regardless of fragment rate. + @gc_min_interval_ms 1_000 + @type initialize_shape_opts() :: %{ :action => :create | :restore, optional(:otel_ctx) => OpenTelemetry.otel_ctx() | nil, @@ -114,7 +122,16 @@ defmodule Electric.Shapes.Consumer do @doc """ Set the adaptive-GC heap threshold (bytes, or nil to disable) for a single stack. Takes effect immediately for that stack's consumers — safe to call from IEx. + + **Critical-path note**: this GC runs synchronously on the replication path — + the ShapeLogCollector blocks until every consumer replies. Prefer a conservative + (high) threshold to minimise added publish latency. The per-process + `fullsweep_after` spawn-opt (configured via `ELECTRIC_PROCESS_SPAWN_OPTS`) is + the lower-risk lever for steady-state heap bounding; this adaptive GC is a + targeted, runtime-tunable backstop. Forced-GC frequency is capped to at most + once per `@gc_min_interval_ms` (see `should_force_gc?/5`). """ + @spec set_gc_heap_threshold(Electric.stack_id(), non_neg_integer() | nil) :: :ok def set_gc_heap_threshold(stack_id, threshold_bytes) when is_nil(threshold_bytes) or (is_integer(threshold_bytes) and threshold_bytes >= 0) do Electric.StackConfig.put(stack_id, :consumer_gc_heap_threshold, threshold_bytes) @@ -126,6 +143,7 @@ defmodule Electric.Shapes.Consumer do Returns `{:ok, number_of_stacks_updated}`. Pass nil to disable everywhere. Intended for live experimentation from an IEx shell. """ + @spec set_gc_heap_threshold_all_stacks(non_neg_integer() | nil) :: {:ok, non_neg_integer()} def set_gc_heap_threshold_all_stacks(threshold_bytes) do stack_ids = list_stack_ids() @@ -551,26 +569,77 @@ defmodule Electric.Shapes.Consumer do |> maybe_garbage_collect() end + # NOTE: this runs synchronously on the replication critical path — the + # ShapeLogCollector blocks until every consumer replies. A forced + # :erlang.garbage_collect() can add measurable publish latency, especially + # during a buffered-fragment drain where maybe_garbage_collect/1 is called + # for every queued fragment in a tight loop. + # + # Two safeguards limit the impact: + # 1. The nil fast-path exits immediately when no threshold is configured. + # 2. Hysteresis (@gc_min_interval_ms) prevents back-to-back full sweeps even + # when the consumer sits just above the threshold across many fragments. + # + # Operators should prefer a CONSERVATIVE (high) threshold. For steady-state + # heap bounding the per-process `fullsweep_after` spawn-opt (set via + # ELECTRIC_PROCESS_SPAWN_OPTS) is a lower-risk alternative; this adaptive GC + # is a targeted, runtime-tunable backstop. defp maybe_garbage_collect(%State{stack_id: stack_id} = state) do case Electric.StackConfig.lookup(stack_id, :consumer_gc_heap_threshold, nil) do nil -> + # Fast path: adaptive GC is disabled — skip all process_info/time calls. state threshold -> {:total_heap_size, heap_words} = :erlang.process_info(self(), :total_heap_size) - if over_heap_threshold?(heap_words, threshold), do: :erlang.garbage_collect() - state + now = System.monotonic_time(:millisecond) + + if should_force_gc?(heap_words, threshold, state.last_forced_gc_at, now) do + :erlang.garbage_collect() + %{state | last_forced_gc_at: now} + else + state + end end end @doc false # heap_words: process total_heap_size in words; threshold_bytes: configured byte threshold (or nil) + @spec over_heap_threshold?(non_neg_integer(), non_neg_integer() | nil) :: boolean() def over_heap_threshold?(_heap_words, nil), do: false def over_heap_threshold?(heap_words, threshold_bytes) when is_integer(threshold_bytes) do heap_words * @word_size > threshold_bytes end + @doc false + # Decide whether to force a full GC sweep: heap must be over the threshold AND + # at least @gc_min_interval_ms must have elapsed since the last forced GC. + # last_gc_at / now_ms are monotonic milliseconds; last_gc_at is nil if this + # consumer has never forced a GC (always fire on first over-threshold event). + # Passing explicit min_interval_ms enables deterministic unit tests. + @spec should_force_gc?( + non_neg_integer(), + non_neg_integer() | nil, + integer() | nil, + integer(), + non_neg_integer() + ) :: boolean() + def should_force_gc?( + heap_words, + threshold_bytes, + last_gc_at, + now_ms, + min_interval_ms \\ @gc_min_interval_ms + ) + + def should_force_gc?(_heap_words, nil, _last_gc_at, _now_ms, _min_interval_ms), do: false + + def should_force_gc?(heap_words, threshold_bytes, last_gc_at, now_ms, min_interval_ms) do + over_heap_threshold?(heap_words, threshold_bytes) and + (is_nil(last_gc_at) or now_ms - last_gc_at >= min_interval_ms) + end + # A consumer process starts with buffering?=true before it has PG snapshot info (xmin, xmax, xip_list). # In this phase we have to buffer incoming txn fragments because we can't yet decide what to # do with the transaction: skip it or write it to the shape log. diff --git a/packages/sync-service/lib/electric/shapes/consumer/state.ex b/packages/sync-service/lib/electric/shapes/consumer/state.ex index 9414f07571..1f8e224fde 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/state.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/state.ex @@ -41,7 +41,10 @@ defmodule Electric.Shapes.Consumer.State do # When a {Storage, :flushed, offset} message arrives during a pending # transaction, we defer the notification and store the max flushed offset # here. Multiple deferred notifications are collapsed into a single most recent offset. - pending_flush_offset: nil + pending_flush_offset: nil, + # Monotonic millisecond timestamp of the last consumer-forced GC (nil if never). + # Used by hysteresis logic in maybe_garbage_collect/1 to cap forced-GC frequency. + last_forced_gc_at: nil ] @type pg_snapshot() :: SnapshotQuery.pg_snapshot() diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 24e8aec437..36aac47c93 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -2598,6 +2598,44 @@ defmodule Electric.Shapes.ConsumerTest do end end + describe "should_force_gc?/5" do + # All tests pass explicit now_ms / last_gc_at / min_interval_ms so they are + # fully deterministic and do not depend on wall-clock time. + + test "false when threshold is nil (adaptive GC disabled)" do + refute Electric.Shapes.Consumer.should_force_gc?(1_000_000, nil, nil, 5_000, 1_000) + end + + test "true when heap over threshold and consumer has never forced a GC (last_gc_at nil)" do + # 1 000 words * 8 bytes/word = 8 000 bytes > threshold of 1 byte + assert Electric.Shapes.Consumer.should_force_gc?(1_000, 1, nil, 5_000, 1_000) + end + + test "false when heap over threshold but interval has not elapsed" do + # last_gc_at=4_500, now=5_000 → delta=500 < min_interval=1_000 → no GC + refute Electric.Shapes.Consumer.should_force_gc?(1_000, 1, 4_500, 5_000, 1_000) + end + + test "true when heap over threshold and interval has elapsed" do + # last_gc_at=3_000, now=5_000 → delta=2_000 >= min_interval=1_000 → GC + assert Electric.Shapes.Consumer.should_force_gc?(1_000, 1, 3_000, 5_000, 1_000) + end + + test "true at exactly the min interval boundary" do + # last_gc_at=4_000, now=5_000 → delta=1_000 == min_interval=1_000 → GC + assert Electric.Shapes.Consumer.should_force_gc?(1_000, 1, 4_000, 5_000, 1_000) + end + + test "false when heap is under threshold regardless of timing" do + # heap_words=1 * wordsize (8) = 8 bytes; threshold=1_000 bytes → under + refute Electric.Shapes.Consumer.should_force_gc?(1, 1_000, nil, 5_000, 1_000) + end + + test "false when heap is under threshold even if interval would have elapsed" do + refute Electric.Shapes.Consumer.should_force_gc?(1, 1_000, 0, 5_000, 1_000) + end + end + describe "adaptive GC after fragment processing" do @describetag :tmp_dir