Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/replication-retry-log-throttle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@core/sync-service": patch
---

Throttle the replication event retry logging and stop logging full event payloads.

A failing replication event is retried every 50ms for up to 10 minutes. Previously every attempt logged at error level, flooding the logs (and any error tracker fed from them) with up to ~12,000 messages per incident, each embedding the full inspected event — potentially megabytes of row data. Now the first failure and one progress update every 10 seconds are logged at error level with the event identity (xid/LSN/relation) and a scrubbed failure reason, while the remaining attempts log full detail at debug level.
94 changes: 82 additions & 12 deletions packages/sync-service/lib/electric/postgres/replication_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ defmodule Electric.Postgres.ReplicationClient do
step: :disconnected,
wait_for_active_ref: nil,
pending_event: nil,
last_retry_error_log: nil,
received_wal: 0,
flushed_wal: 0,
last_seen_txn_lsn: Lsn.from_integer(0),
Expand All @@ -81,6 +82,7 @@ defmodule Electric.Postgres.ReplicationClient do
step: Electric.Postgres.ReplicationClient.step(),
wait_for_active_ref: {reference(), term()} | nil,
pending_event: {reference(), term(), non_neg_integer(), integer()} | nil,
last_retry_error_log: integer() | nil,
received_wal: non_neg_integer(),
flushed_wal: non_neg_integer(),
last_seen_txn_lsn: Lsn.t(),
Expand Down Expand Up @@ -155,6 +157,10 @@ defmodule Electric.Postgres.ReplicationClient do
# Maximum time to spend retrying a crashed event handler before giving up.
@max_event_retry_time 10 * 60_000

# How often a still-failing event retry is logged at error level; attempts
# in between are logged at debug level. See log_event_retry/6.
@retry_log_interval 10_000

@spec start_link(Keyword.t()) :: :gen_statem.start_ret()
def start_link(opts) do
config = Map.new(opts)
Expand Down Expand Up @@ -361,7 +367,7 @@ defmodule Electric.Postgres.ReplicationClient do
)
when is_reference(ref) do
Process.demonitor(ref, [:flush])
state = %{state | pending_event: nil}
state = %{state | pending_event: nil, last_retry_error_log: nil}
state = maybe_update_flush_up_to_date(state)
{acks, state} = acknowledge_transaction(event, state)
{:noreply_and_resume, acks, state}
Expand All @@ -388,15 +394,16 @@ defmodule Electric.Postgres.ReplicationClient do
state = %{state | pending_event: nil}

if remaining > 0 do
Logger.error(
"Error processing replication event (#{remaining}ms retry budget left): " <>
inspect(reason)
)
state =
log_event_retry(state, :processing, event, remaining, reason, fn -> inspect(reason) end)

Process.send_after(self(), {:process_event, event, remaining}, @event_retry_delay)
{:noreply, state}
else
Logger.error("Exhausted retry budget processing replication event: " <> inspect(reason))
Logger.error(
"Exhausted retry budget processing replication event: #{describe_event(event)} failed with: " <>
inspect_scrubbed(reason)
)

exit(reason)
end
Expand Down Expand Up @@ -540,24 +547,87 @@ defmodule Electric.Postgres.ReplicationClient do
remaining = time_remaining - (System.monotonic_time(:millisecond) - start_time)

if remaining > 0 do
Logger.error(
"Error dispatching replication event (#{remaining}ms retry budget left): " <>
Exception.format(kind, reason, __STACKTRACE__)
)
stacktrace = __STACKTRACE__

state =
log_event_retry(state, :dispatching, event, remaining, reason, fn ->
Exception.format(kind, reason, stacktrace)
end)

Process.send_after(self(), {:process_event, event, remaining}, @event_retry_delay)
{:noreply, state}
else
Logger.error(
"Exhausted retry budget dispatching replication event: " <>
Exception.format(kind, reason, __STACKTRACE__)
"Exhausted retry budget dispatching replication event: #{describe_event(event)} failed with: " <>
inspect_scrubbed(reason)
)

:erlang.raise(kind, reason, __STACKTRACE__)
end
end
end

# A failing event is retried every @event_retry_delay (50ms) for up to
# @max_event_retry_time (10 minutes), so logging every attempt at error
# level floods the log output — and any error tracker fed from it — with up
# to ~12000 messages for a single incident. Log the first failure and one
# progress update per @retry_log_interval at error level, and every other
# attempt at debug level. The throttle window is wall-clock time tracked in
# the state because a fast failure (e.g. :noproc) consumes no measurable
# retry budget.
#
# The error-level message carries the event identity instead of the full
# event: the payload can be megabytes of row data, which both bloats the
# message and copies user data into the logs. The full detail remains
# available at debug level.
defp log_event_retry(state, action, event, remaining, reason, debug_detail_fn) do
now = System.monotonic_time(:millisecond)

if is_nil(state.last_retry_error_log) or
now - state.last_retry_error_log >= @retry_log_interval do
Logger.error(
"Error #{action} replication event (#{div(remaining, 1000)}s retry budget left, retrying every #{@event_retry_delay}ms): " <>
"#{describe_event(event)} failed with: " <> inspect_scrubbed(reason)
)

%{state | last_retry_error_log: now}
else
Logger.debug(fn ->
"Error #{action} replication event (#{remaining}ms retry budget left): " <>
debug_detail_fn.()
end)

state
end
end

defp describe_event(%TransactionFragment{} = fragment) do
"transaction fragment xid=#{fragment.xid} lsn=#{fragment.lsn} changes=#{fragment.change_count}"
end

defp describe_event(%Relation{} = rel) do
~s|relation "#{rel.schema}"."#{rel.table}" (oid #{rel.id})|
end

# Replace replication events embedded in an exit reason (e.g. as arguments
# in a :noproc tuple) with their one-line summary before inspecting, so
# that error-level logs never carry full row data.
defp inspect_scrubbed(term) do
term |> scrub_events() |> inspect(limit: 100, printable_limit: 2048)
end

defp scrub_events(%TransactionFragment{} = event), do: "#<#{describe_event(event)}>"
defp scrub_events(%Relation{} = event), do: "#<#{describe_event(event)}>"
defp scrub_events(%_{} = other_struct), do: other_struct

defp scrub_events(tuple) when is_tuple(tuple),
do: tuple |> Tuple.to_list() |> Enum.map(&scrub_events/1) |> List.to_tuple()

# head/tail recursion keeps this safe for improper lists, which can show up
# in arbitrary exit reasons
defp scrub_events([head | tail]), do: [scrub_events(head) | scrub_events(tail)]
defp scrub_events(other), do: other

# Downstream returned :not_ready — subscribe to StatusMonitor for notification
# when the stack becomes active, then retry. This replaces the old blocking
# wait_until_active(timeout: :infinity) call with an async notification.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,43 @@ defmodule Electric.Postgres.ReplicationClientTest do
assert %NewRecord{record: %{"value" => "test value 2"}} = receive_tx_change()
end

@tag handle_event: {MockTransactionProcessor, :handle_event_async, []}
test "logs a failing event dispatch at error level once, not per retry attempt",
%{db_conn: conn} = ctx do
start_client(ctx)

# process one transaction so that the relation message is already
# handled and the failing event below is the transaction itself
start_supervised({MockTransactionProcessor, self()})
insert_item(conn, "first value")
assert %NewRecord{record: %{"value" => "first value"}} = receive_tx_change()

stop_supervised(MockTransactionProcessor)

error_log =
capture_log([level: :error], fn ->
# with the processor stopped every dispatch exits with :noproc and
# is retried every 50ms
insert_item(conn, "a private row value")
Process.sleep(500)
end)

error_lines =
error_log
|> String.split("\n")
|> Enum.count(&(&1 =~ "Error dispatching replication event"))

assert error_lines == 1

# the error-level log identifies the event without embedding row data
assert error_log =~ "xid="
refute error_log =~ "a private row value"

# the event is still delivered once the handler comes back up
start_supervised({MockTransactionProcessor, self()})
assert %NewRecord{record: %{"value" => "a private row value"}} = receive_tx_change()
end

@tag database_settings: ["wal_sender_timeout='3s'"]
@tag handle_event: {MockTransactionProcessor, :handle_event_async, []}
test "connection survives wal_sender_timeout when event handler is unavailable",
Expand Down
Loading