From cb4a3bf257ee98d51452e9ecf049971fb02d41dc Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 11 Jun 2026 15:24:39 +0200 Subject: [PATCH 1/2] fix: throttle replication event retry logging and scrub event payloads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A failing replication event is retried every @event_retry_delay (50ms) for up to @max_event_retry_time (10 minutes) and every attempt was logged at error level — up to ~12000 error logs per incident, flooding log output and any error tracker fed from it (the cloud ships every Logger.error line to Sentry). Each message also embedded the full inspected event, copying potentially megabytes of customer row data into the logs on every 50ms attempt. Now the first failure and one progress update per @retry_log_interval (10s) are logged at error level, carrying the event identity (xid/lsn/relation) and the failure reason with any embedded replication events replaced by their one-line summary. The attempts in between log the full detail at debug level. The throttle window is wall-clock time tracked in the client state, because a fast failure such as :noproc consumes no measurable retry budget. Fixes part of #4563. Co-Authored-By: Claude Fable 5 --- .changeset/replication-retry-log-throttle.md | 7 ++ .../electric/postgres/replication_client.ex | 90 ++++++++++++++++--- .../postgres/replication_client_test.exs | 37 ++++++++ 3 files changed, 122 insertions(+), 12 deletions(-) create mode 100644 .changeset/replication-retry-log-throttle.md diff --git a/.changeset/replication-retry-log-throttle.md b/.changeset/replication-retry-log-throttle.md new file mode 100644 index 0000000000..c8dbebc0fb --- /dev/null +++ b/.changeset/replication-retry-log-throttle.md @@ -0,0 +1,7 @@ +--- +"@core/sync-service": patch +--- + +Throttle the replication event retry logging and stop logging full event payloads. + +A failing replication event is retried every 50ms for up to 10 minutes. Previously every attempt logged at error level, flooding the logs (and any error tracker fed from them) with up to ~12,000 messages per incident, each embedding the full inspected event — potentially megabytes of row data. Now the first failure and one progress update every 10 seconds are logged at error level with the event identity (xid/LSN/relation) and a scrubbed failure reason, while the remaining attempts log full detail at debug level. diff --git a/packages/sync-service/lib/electric/postgres/replication_client.ex b/packages/sync-service/lib/electric/postgres/replication_client.ex index 7d29f165a4..09bb8c4be8 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client.ex @@ -55,6 +55,7 @@ defmodule Electric.Postgres.ReplicationClient do step: :disconnected, wait_for_active_ref: nil, pending_event: nil, + last_retry_error_log: nil, received_wal: 0, flushed_wal: 0, last_seen_txn_lsn: Lsn.from_integer(0), @@ -81,6 +82,7 @@ defmodule Electric.Postgres.ReplicationClient do step: Electric.Postgres.ReplicationClient.step(), wait_for_active_ref: {reference(), term()} | nil, pending_event: {reference(), term(), non_neg_integer(), integer()} | nil, + last_retry_error_log: integer() | nil, received_wal: non_neg_integer(), flushed_wal: non_neg_integer(), last_seen_txn_lsn: Lsn.t(), @@ -155,6 +157,10 @@ defmodule Electric.Postgres.ReplicationClient do # Maximum time to spend retrying a crashed event handler before giving up. @max_event_retry_time 10 * 60_000 + # How often a still-failing event retry is logged at error level; attempts + # in between are logged at debug level. See log_event_retry/6. + @retry_log_interval 10_000 + @spec start_link(Keyword.t()) :: :gen_statem.start_ret() def start_link(opts) do config = Map.new(opts) @@ -361,7 +367,7 @@ defmodule Electric.Postgres.ReplicationClient do ) when is_reference(ref) do Process.demonitor(ref, [:flush]) - state = %{state | pending_event: nil} + state = %{state | pending_event: nil, last_retry_error_log: nil} state = maybe_update_flush_up_to_date(state) {acks, state} = acknowledge_transaction(event, state) {:noreply_and_resume, acks, state} @@ -388,15 +394,15 @@ defmodule Electric.Postgres.ReplicationClient do state = %{state | pending_event: nil} if remaining > 0 do - Logger.error( - "Error processing replication event (#{remaining}ms retry budget left): " <> - inspect(reason) - ) + state = log_event_retry(state, :processing, event, remaining, reason, fn -> inspect(reason) end) Process.send_after(self(), {:process_event, event, remaining}, @event_retry_delay) {:noreply, state} else - Logger.error("Exhausted retry budget processing replication event: " <> inspect(reason)) + Logger.error( + "Exhausted retry budget processing replication event: #{describe_event(event)} failed with: " <> + inspect_scrubbed(reason) + ) exit(reason) end @@ -540,17 +546,19 @@ defmodule Electric.Postgres.ReplicationClient do remaining = time_remaining - (System.monotonic_time(:millisecond) - start_time) if remaining > 0 do - Logger.error( - "Error dispatching replication event (#{remaining}ms retry budget left): " <> - Exception.format(kind, reason, __STACKTRACE__) - ) + stacktrace = __STACKTRACE__ + + state = + log_event_retry(state, :dispatching, event, remaining, reason, fn -> + Exception.format(kind, reason, stacktrace) + end) Process.send_after(self(), {:process_event, event, remaining}, @event_retry_delay) {:noreply, state} else Logger.error( - "Exhausted retry budget dispatching replication event: " <> - Exception.format(kind, reason, __STACKTRACE__) + "Exhausted retry budget dispatching replication event: #{describe_event(event)} failed with: " <> + inspect_scrubbed(reason) ) :erlang.raise(kind, reason, __STACKTRACE__) @@ -558,6 +566,64 @@ defmodule Electric.Postgres.ReplicationClient do end end + # A failing event is retried every @event_retry_delay (50ms) for up to + # @max_event_retry_time (10 minutes), so logging every attempt at error + # level floods the log output — and any error tracker fed from it — with up + # to ~12000 messages for a single incident. Log the first failure and one + # progress update per @retry_log_interval at error level, and every other + # attempt at debug level. The throttle window is wall-clock time tracked in + # the state because a fast failure (e.g. :noproc) consumes no measurable + # retry budget. + # + # The error-level message carries the event identity instead of the full + # event: the payload can be megabytes of row data, which both bloats the + # message and copies user data into the logs. The full detail remains + # available at debug level. + defp log_event_retry(state, action, event, remaining, reason, debug_detail_fn) do + now = System.monotonic_time(:millisecond) + + if is_nil(state.last_retry_error_log) or + now - state.last_retry_error_log >= @retry_log_interval do + Logger.error( + "Error #{action} replication event (#{div(remaining, 1000)}s retry budget left, retrying every #{@event_retry_delay}ms): " <> + "#{describe_event(event)} failed with: " <> inspect_scrubbed(reason) + ) + + %{state | last_retry_error_log: now} + else + Logger.debug(fn -> + "Error #{action} replication event (#{remaining}ms retry budget left): " <> + debug_detail_fn.() + end) + + state + end + end + + defp describe_event(%TransactionFragment{} = fragment) do + "transaction fragment xid=#{fragment.xid} lsn=#{fragment.lsn} changes=#{fragment.change_count}" + end + + defp describe_event(%Relation{} = rel) do + ~s|relation "#{rel.schema}"."#{rel.table}" (oid #{rel.id})| + end + + # Replace replication events embedded in an exit reason (e.g. as arguments + # in a :noproc tuple) with their one-line summary before inspecting, so + # that error-level logs never carry full row data. + defp inspect_scrubbed(term) do + term |> scrub_events() |> inspect(limit: 100, printable_limit: 2048) + end + + defp scrub_events(%TransactionFragment{} = event), do: "#<#{describe_event(event)}>" + defp scrub_events(%Relation{} = event), do: "#<#{describe_event(event)}>" + + defp scrub_events(tuple) when is_tuple(tuple), + do: tuple |> Tuple.to_list() |> Enum.map(&scrub_events/1) |> List.to_tuple() + + defp scrub_events(list) when is_list(list), do: Enum.map(list, &scrub_events/1) + defp scrub_events(other), do: other + # Downstream returned :not_ready — subscribe to StatusMonitor for notification # when the stack becomes active, then retry. This replaces the old blocking # wait_until_active(timeout: :infinity) call with an async notification. diff --git a/packages/sync-service/test/electric/postgres/replication_client_test.exs b/packages/sync-service/test/electric/postgres/replication_client_test.exs index a0c41b7215..61d5835259 100644 --- a/packages/sync-service/test/electric/postgres/replication_client_test.exs +++ b/packages/sync-service/test/electric/postgres/replication_client_test.exs @@ -273,6 +273,43 @@ defmodule Electric.Postgres.ReplicationClientTest do assert %NewRecord{record: %{"value" => "test value 2"}} = receive_tx_change() end + @tag handle_event: {MockTransactionProcessor, :handle_event_async, []} + test "logs a failing event dispatch at error level once, not per retry attempt", + %{db_conn: conn} = ctx do + start_client(ctx) + + # process one transaction so that the relation message is already + # handled and the failing event below is the transaction itself + start_supervised({MockTransactionProcessor, self()}) + insert_item(conn, "first value") + assert %NewRecord{record: %{"value" => "first value"}} = receive_tx_change() + + stop_supervised(MockTransactionProcessor) + + error_log = + capture_log([level: :error], fn -> + # with the processor stopped every dispatch exits with :noproc and + # is retried every 50ms + insert_item(conn, "a private row value") + Process.sleep(500) + end) + + error_lines = + error_log + |> String.split("\n") + |> Enum.count(&(&1 =~ "Error dispatching replication event")) + + assert error_lines == 1 + + # the error-level log identifies the event without embedding row data + assert error_log =~ "xid=" + refute error_log =~ "a private row value" + + # the event is still delivered once the handler comes back up + start_supervised({MockTransactionProcessor, self()}) + assert %NewRecord{record: %{"value" => "a private row value"}} = receive_tx_change() + end + @tag database_settings: ["wal_sender_timeout='3s'"] @tag handle_event: {MockTransactionProcessor, :handle_event_async, []} test "connection survives wal_sender_timeout when event handler is unavailable", From 29a5fcbec60d2d78c021dd29482e7b48c792de5b Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Thu, 11 Jun 2026 15:27:22 +0200 Subject: [PATCH 2/2] fix: harden exit-reason scrubbing and reformat Make scrub_events safe for improper lists and leave other structs untouched, and apply mix format to the retry logging call site. Co-Authored-By: Claude Fable 5 --- .../lib/electric/postgres/replication_client.ex | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/sync-service/lib/electric/postgres/replication_client.ex b/packages/sync-service/lib/electric/postgres/replication_client.ex index 09bb8c4be8..dec46c9425 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client.ex @@ -394,7 +394,8 @@ defmodule Electric.Postgres.ReplicationClient do state = %{state | pending_event: nil} if remaining > 0 do - state = log_event_retry(state, :processing, event, remaining, reason, fn -> inspect(reason) end) + state = + log_event_retry(state, :processing, event, remaining, reason, fn -> inspect(reason) end) Process.send_after(self(), {:process_event, event, remaining}, @event_retry_delay) {:noreply, state} @@ -617,11 +618,14 @@ defmodule Electric.Postgres.ReplicationClient do defp scrub_events(%TransactionFragment{} = event), do: "#<#{describe_event(event)}>" defp scrub_events(%Relation{} = event), do: "#<#{describe_event(event)}>" + defp scrub_events(%_{} = other_struct), do: other_struct defp scrub_events(tuple) when is_tuple(tuple), do: tuple |> Tuple.to_list() |> Enum.map(&scrub_events/1) |> List.to_tuple() - defp scrub_events(list) when is_list(list), do: Enum.map(list, &scrub_events/1) + # head/tail recursion keeps this safe for improper lists, which can show up + # in arbitrary exit reasons + defp scrub_events([head | tail]), do: [scrub_events(head) | scrub_events(tail)] defp scrub_events(other), do: other # Downstream returned :not_ready — subscribe to StatusMonitor for notification