From 13b3967b18ec0aed71a56d274a7a71a19c290a25 Mon Sep 17 00:00:00 2001 From: Erik the Implementer Date: Wed, 22 Apr 2026 01:54:41 +0200 Subject: [PATCH 1/2] Centralize telemetry collector error handling in Poller.safe_invoke/3 telemetry_poller removes a measurement permanently from its polling list after the first failure, so transient collector errors (GenServer restart races, ETS tables not yet created, DB unavailable) silently disable metrics for the lifetime of the poller. Wrap every MFA built by ElectricTelemetry.Poller.periodic_measurements/2 in safe_invoke/3, which absorbs :noproc/:timeout/:shutdown/:normal exits and ArgumentError silently and logs any other failure as a warning tagged with the offending MFA. The measurement always returns :ok to telemetry_poller and stays on the polling list. Strip now-redundant defensive code from count_shapes/2 (with-fallthrough) and report_retained_wal_size/3 (try/catch on :noproc and catch-all). Refs electric-sql/alco-agent-tasks#32 --- .changeset/telemetry-poller-safe-invoke.md | 8 ++ .../lib/electric/telemetry/poller.ex | 54 ++++++++- .../test/electric/telemetry/poller_test.exs | 106 ++++++++++++++++++ .../electric/stack_supervisor/telemetry.ex | 85 ++++++-------- 4 files changed, 198 insertions(+), 55 deletions(-) create mode 100644 .changeset/telemetry-poller-safe-invoke.md create mode 100644 packages/electric-telemetry/test/electric/telemetry/poller_test.exs diff --git a/.changeset/telemetry-poller-safe-invoke.md b/.changeset/telemetry-poller-safe-invoke.md new file mode 100644 index 0000000000..288517fe47 --- /dev/null +++ b/.changeset/telemetry-poller-safe-invoke.md @@ -0,0 +1,8 @@ +--- +'@core/electric-telemetry': patch +'@core/sync-service': patch +--- + +Wrap telemetry-poller MFAs in `ElectricTelemetry.Poller.safe_invoke/3` so that transient collector failures (`:noproc`, `:timeout`, `:shutdown`/`:normal` exits, `ArgumentError` from not-yet-created ETS tables) no longer cause `:telemetry_poller` to permanently remove the measurement from its polling list. Unexpected errors are now logged as warnings with the offending MFA and the collector keeps being polled on subsequent ticks. Strips now-redundant defensive `try/catch` / `with`-fallthrough code from `count_shapes/2` and `report_retained_wal_size/3`. + +Note: user-supplied periodic measurement functions no longer have exceptions propagated up to `:telemetry_poller`'s own error logger — they are caught and logged via `ElectricTelemetry.Poller` instead. diff --git a/packages/electric-telemetry/lib/electric/telemetry/poller.ex b/packages/electric-telemetry/lib/electric/telemetry/poller.ex index 12607ac9d1..853c5654ba 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/poller.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/poller.ex @@ -1,4 +1,6 @@ defmodule ElectricTelemetry.Poller do + require Logger + @callback builtin_periodic_measurements(map) :: list() def child_spec(telemetry_opts, poller_opts) do @@ -29,15 +31,59 @@ defmodule ElectricTelemetry.Poller do # These are implemented by telemetry_poller f when f in [:memory, :persistent_term, :system_counts, :total_run_queue_lengths] -> [f] # Bare function names are assumed to be referring to functions defined in the caller module - f when is_atom(f) -> {module, f, [telemetry_opts]} - f when is_function(f, 1) -> {__MODULE__, :user_measurement, [f, telemetry_opts]} - {m, f, a} when is_atom(m) and is_atom(f) and is_list(a) -> [{m, f, a ++ [telemetry_opts]}] + f when is_atom(f) -> [wrap(module, f, [telemetry_opts])] + f when is_function(f, 1) -> [wrap(__MODULE__, :user_measurement, [f, telemetry_opts])] + {m, f, a} when is_atom(m) and is_atom(f) and is_list(a) -> [wrap(m, f, a ++ [telemetry_opts])] end) end def periodic_measurements(telemetry_opts, module), - do: module.builtin_periodic_measurements(telemetry_opts) + do: Enum.map(module.builtin_periodic_measurements(telemetry_opts), &wrap_mfa/1) + + defp wrap_mfa({m, f, a}), do: wrap(m, f, a) + defp wrap_mfa(other), do: other + + defp wrap(m, f, a), do: {__MODULE__, :safe_invoke, [m, f, a]} # Helper function to enable telemetry_poller to call a user-provided anonymous function def user_measurement(f, telemetry_opts), do: f.(telemetry_opts) + + @doc """ + Invoke a periodic measurement MFA, absorbing common failure modes. + + `:telemetry_poller` removes a measurement permanently from its polling list + after the first failure. Wrapping every MFA in `safe_invoke/3` prevents that: + transient errors (GenServer restart races, ETS tables not yet created, DB + unavailability) are logged as warnings and swallowed so the measurement keeps + being polled on subsequent ticks. + """ + def safe_invoke(m, f, a) do + apply(m, f, a) + :ok + rescue + ArgumentError -> + :ok + + e -> + Logger.warning( + "Telemetry collector #{inspect(m)}.#{f}/#{length(a)} crashed: " <> + Exception.message(e) + ) + + :ok + catch + :exit, {reason, _} when reason in [:noproc, :timeout, :shutdown, :normal] -> + :ok + + :exit, reason when reason in [:noproc, :shutdown, :normal] -> + :ok + + kind, reason -> + Logger.warning( + "Telemetry collector #{inspect(m)}.#{f}/#{length(a)} #{kind}: " <> + inspect(reason) + ) + + :ok + end end diff --git a/packages/electric-telemetry/test/electric/telemetry/poller_test.exs b/packages/electric-telemetry/test/electric/telemetry/poller_test.exs new file mode 100644 index 0000000000..a0e20e2395 --- /dev/null +++ b/packages/electric-telemetry/test/electric/telemetry/poller_test.exs @@ -0,0 +1,106 @@ +defmodule ElectricTelemetry.PollerTest do + use ExUnit.Case, async: true + + import ExUnit.CaptureLog + + alias ElectricTelemetry.Poller + + defmodule Fixture do + def ok(), do: :done + def raise_argument(), do: raise(ArgumentError, "boom") + def raise_runtime(), do: raise(RuntimeError, "kaboom") + def exit_noproc(), do: exit({:noproc, {GenServer, :call, [:nowhere, :hi]}}) + def exit_timeout(), do: exit({:timeout, {GenServer, :call, [:slow, :hi]}}) + def exit_shutdown(), do: exit({:shutdown, :foo}) + def exit_normal_atom(), do: exit(:normal) + def exit_shutdown_atom(), do: exit(:shutdown) + def exit_weird(), do: exit(:weird) + def throw_it(), do: throw(:nope) + end + + describe "safe_invoke/3" do + test "returns :ok and runs the function on success" do + assert Poller.safe_invoke(Fixture, :ok, []) == :ok + end + + test "swallows ArgumentError (ETS missing, etc.) silently" do + log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :raise_argument, []) == :ok end) + refute log =~ "crashed" + end + + test "swallows generic exceptions with a warning" do + log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :raise_runtime, []) == :ok end) + assert log =~ "crashed" + assert log =~ "kaboom" + end + + test "swallows :noproc exit silently" do + log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :exit_noproc, []) == :ok end) + refute log =~ "exit" + end + + test "swallows :timeout exit silently" do + log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :exit_timeout, []) == :ok end) + refute log =~ "exit" + end + + test "swallows :shutdown exit silently" do + log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :exit_shutdown, []) == :ok end) + refute log =~ "exit" + end + + test "swallows bare :normal exit silently" do + log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :exit_normal_atom, []) == :ok end) + refute log =~ "exit" + end + + test "swallows bare :shutdown exit silently" do + log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :exit_shutdown_atom, []) == :ok end) + refute log =~ "exit" + end + + test "logs a warning for unexpected exits" do + log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :exit_weird, []) == :ok end) + assert log =~ "exit" + assert log =~ "weird" + end + + test "logs a warning for throws" do + log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :throw_it, []) == :ok end) + assert log =~ "throw" + end + end + + describe "periodic_measurements/2 wrapping" do + defmodule CallbackMod do + @behaviour ElectricTelemetry.Poller + def builtin_periodic_measurements(_opts), do: [] + def some_measurement(_opts), do: :ok + end + + test "wraps {m, f, a} tuples in safe_invoke" do + opts = %{periodic_measurements: [{CallbackMod, :some_measurement, []}]} + assert [{ElectricTelemetry.Poller, :safe_invoke, [CallbackMod, :some_measurement, [_]]}] = + Poller.periodic_measurements(opts, CallbackMod) + end + + test "wraps bare function atoms in safe_invoke" do + opts = %{periodic_measurements: [:some_measurement]} + assert [{ElectricTelemetry.Poller, :safe_invoke, [CallbackMod, :some_measurement, [_]]}] = + Poller.periodic_measurements(opts, CallbackMod) + end + + test "wraps anonymous functions in safe_invoke around user_measurement" do + f = fn _ -> :ok end + opts = %{periodic_measurements: [f]} + + assert [{ElectricTelemetry.Poller, :safe_invoke, [ElectricTelemetry.Poller, :user_measurement, [^f, _]]}] = + Poller.periodic_measurements(opts, CallbackMod) + end + + test "leaves telemetry_poller builtins unwrapped" do + opts = %{periodic_measurements: [:memory, :persistent_term]} + assert Poller.periodic_measurements(opts, CallbackMod) == [:memory, :persistent_term] + end + end +end diff --git a/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex b/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex index ed74295ed5..1206ded160 100644 --- a/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex +++ b/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex @@ -1,6 +1,4 @@ defmodule Electric.StackSupervisor.Telemetry do - require Logger - def configure(config) do # Set shared OpenTelemetry span attributes for the given stack. They are stored in # persistent_term so it doesn't matter which process this function is called from. @@ -29,16 +27,14 @@ defmodule Electric.StackSupervisor.Telemetry do end def count_shapes(stack_id, _telemetry_opts) do - # Telemetry is started before everything else in the stack, so we need to handle - # the case where the shape cache is not started yet. - with %{total: num_shapes, indexed: indexed_shapes, unindexed: unindexed_shapes} <- - Electric.ShapeCache.shape_counts(stack_id) do - Electric.Telemetry.OpenTelemetry.execute( - [:electric, :shapes, :total_shapes], - %{count: num_shapes, count_indexed: indexed_shapes, count_unindexed: unindexed_shapes}, - %{stack_id: stack_id} - ) - end + %{total: num_shapes, indexed: indexed_shapes, unindexed: unindexed_shapes} = + Electric.ShapeCache.shape_counts(stack_id) + + Electric.Telemetry.OpenTelemetry.execute( + [:electric, :shapes, :total_shapes], + %{count: num_shapes, count_indexed: indexed_shapes, count_unindexed: unindexed_shapes}, + %{stack_id: stack_id} + ) Electric.Telemetry.OpenTelemetry.execute( [:electric, :shapes, :active_shapes], @@ -74,47 +70,34 @@ defmodule Electric.StackSupervisor.Telemetry do @doc false @spec report_retained_wal_size(Electric.stack_id(), binary(), map()) :: :ok def report_retained_wal_size(stack_id, slot_name, _telemetry_opts) do - try do - %Postgrex.Result{rows: [[pg_wal_offset, retained_wal_size, confirmed_flush_lsn_lag]]} = - Postgrex.query!( - Electric.Connection.Manager.admin_pool(stack_id), - @retained_wal_size_query, - [slot_name], - timeout: 3_000, - deadline: 3_000 - ) - - # The query above can return `-1` for `confirmed_flush_lsn_lag` which means that Electric - # is caught up with Postgres' replication stream. - # This is a confusing stat if we're measuring in bytes, so use 0 as the bottom limit. - - Electric.Telemetry.OpenTelemetry.execute( - [:electric, :postgres, :replication], - %{ - # The absolute value of pg_current_wal_lsn() doesn't convey any useful info but by - # plotting its rate of change we can see how fast the WAL is growing. - # - # We shift the absolute value of pg_current_wal_lsn() by -2**63 in the query above - # to make sure it fits inside the signed 64-bit integer type expected by the - # OpenTelemetry Protocol, - pg_wal_offset: pg_wal_offset, - slot_retained_wal_size: retained_wal_size, - slot_confirmed_flush_lsn_lag: max(0, confirmed_flush_lsn_lag) - }, - %{stack_id: stack_id} + %Postgrex.Result{rows: [[pg_wal_offset, retained_wal_size, confirmed_flush_lsn_lag]]} = + Postgrex.query!( + Electric.Connection.Manager.admin_pool(stack_id), + @retained_wal_size_query, + [slot_name], + timeout: 3_000, + deadline: 3_000 ) - catch - :exit, {:noproc, _} -> - :ok - # catch all errors to not log them as errors, those are reporing issues at best - type, reason -> - Logger.warning( - "Failed to query retained WAL size\nError: #{Exception.format(type, reason)}", - stack_id: stack_id, - slot_name: slot_name - ) - end + # The query above can return `-1` for `confirmed_flush_lsn_lag` which means that Electric + # is caught up with Postgres' replication stream. + # This is a confusing stat if we're measuring in bytes, so use 0 as the bottom limit. + + Electric.Telemetry.OpenTelemetry.execute( + [:electric, :postgres, :replication], + %{ + # The absolute value of pg_current_wal_lsn() doesn't convey any useful info but by + # plotting its rate of change we can see how fast the WAL is growing. + # + # We shift the absolute value of pg_current_wal_lsn() by -2**63 in the query above + # to make sure it fits inside the signed 64-bit integer type expected by the + # OpenTelemetry Protocol, + pg_wal_offset: pg_wal_offset, + slot_retained_wal_size: retained_wal_size, + slot_confirmed_flush_lsn_lag: max(0, confirmed_flush_lsn_lag) + }, + %{stack_id: stack_id} + ) end if Code.ensure_loaded?(ElectricTelemetry.DiskUsage) do From 275e0d9a64bfabbcaa03b3585104fe1a8822cbd1 Mon Sep 17 00:00:00 2001 From: erik-the-implementer Date: Wed, 22 Apr 2026 02:03:58 +0200 Subject: [PATCH 2/2] Restore local defensive handling in StackSupervisor.Telemetry + formatting - count_shapes/2: restore `with` fallthrough on `:error` from shape_counts and emit `active_shapes` independently so a shape-cache outage doesn't drop both metrics for the tick (Codex feedback). - report_retained_wal_size/3: restore try/catch around the Postgrex call so direct callers (including the stack-down regression test) don't crash on transient DB/pool failures (Codex feedback). - Reformat poller.ex and poller_test.exs per `mix format`. safe_invoke/3 stays as the backstop for unexpected errors through the poller wrapper; these local handlers ensure graceful partial emission for known stack-startup states. --- .../lib/electric/telemetry/poller.ex | 19 +++-- .../test/electric/telemetry/poller_test.exs | 15 +++- .../electric/stack_supervisor/telemetry.ex | 79 +++++++++++-------- 3 files changed, 71 insertions(+), 42 deletions(-) diff --git a/packages/electric-telemetry/lib/electric/telemetry/poller.ex b/packages/electric-telemetry/lib/electric/telemetry/poller.ex index 853c5654ba..93af93c60c 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/poller.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/poller.ex @@ -27,13 +27,22 @@ defmodule ElectricTelemetry.Poller do def periodic_measurements(%{periodic_measurements: measurements} = telemetry_opts, module) do Enum.flat_map(measurements, fn - :builtin -> module.builtin_periodic_measurements(telemetry_opts) + :builtin -> + module.builtin_periodic_measurements(telemetry_opts) + # These are implemented by telemetry_poller - f when f in [:memory, :persistent_term, :system_counts, :total_run_queue_lengths] -> [f] + f when f in [:memory, :persistent_term, :system_counts, :total_run_queue_lengths] -> + [f] + # Bare function names are assumed to be referring to functions defined in the caller module - f when is_atom(f) -> [wrap(module, f, [telemetry_opts])] - f when is_function(f, 1) -> [wrap(__MODULE__, :user_measurement, [f, telemetry_opts])] - {m, f, a} when is_atom(m) and is_atom(f) and is_list(a) -> [wrap(m, f, a ++ [telemetry_opts])] + f when is_atom(f) -> + [wrap(module, f, [telemetry_opts])] + + f when is_function(f, 1) -> + [wrap(__MODULE__, :user_measurement, [f, telemetry_opts])] + + {m, f, a} when is_atom(m) and is_atom(f) and is_list(a) -> + [wrap(m, f, a ++ [telemetry_opts])] end) end diff --git a/packages/electric-telemetry/test/electric/telemetry/poller_test.exs b/packages/electric-telemetry/test/electric/telemetry/poller_test.exs index a0e20e2395..db8363a7fb 100644 --- a/packages/electric-telemetry/test/electric/telemetry/poller_test.exs +++ b/packages/electric-telemetry/test/electric/telemetry/poller_test.exs @@ -50,12 +50,16 @@ defmodule ElectricTelemetry.PollerTest do end test "swallows bare :normal exit silently" do - log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :exit_normal_atom, []) == :ok end) + log = + capture_log(fn -> assert Poller.safe_invoke(Fixture, :exit_normal_atom, []) == :ok end) + refute log =~ "exit" end test "swallows bare :shutdown exit silently" do - log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :exit_shutdown_atom, []) == :ok end) + log = + capture_log(fn -> assert Poller.safe_invoke(Fixture, :exit_shutdown_atom, []) == :ok end) + refute log =~ "exit" end @@ -80,12 +84,14 @@ defmodule ElectricTelemetry.PollerTest do test "wraps {m, f, a} tuples in safe_invoke" do opts = %{periodic_measurements: [{CallbackMod, :some_measurement, []}]} + assert [{ElectricTelemetry.Poller, :safe_invoke, [CallbackMod, :some_measurement, [_]]}] = Poller.periodic_measurements(opts, CallbackMod) end test "wraps bare function atoms in safe_invoke" do opts = %{periodic_measurements: [:some_measurement]} + assert [{ElectricTelemetry.Poller, :safe_invoke, [CallbackMod, :some_measurement, [_]]}] = Poller.periodic_measurements(opts, CallbackMod) end @@ -94,7 +100,10 @@ defmodule ElectricTelemetry.PollerTest do f = fn _ -> :ok end opts = %{periodic_measurements: [f]} - assert [{ElectricTelemetry.Poller, :safe_invoke, [ElectricTelemetry.Poller, :user_measurement, [^f, _]]}] = + assert [ + {ElectricTelemetry.Poller, :safe_invoke, + [ElectricTelemetry.Poller, :user_measurement, [^f, _]]} + ] = Poller.periodic_measurements(opts, CallbackMod) end diff --git a/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex b/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex index 1206ded160..1e8f1715a7 100644 --- a/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex +++ b/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex @@ -1,4 +1,6 @@ defmodule Electric.StackSupervisor.Telemetry do + require Logger + def configure(config) do # Set shared OpenTelemetry span attributes for the given stack. They are stored in # persistent_term so it doesn't matter which process this function is called from. @@ -27,14 +29,16 @@ defmodule Electric.StackSupervisor.Telemetry do end def count_shapes(stack_id, _telemetry_opts) do - %{total: num_shapes, indexed: indexed_shapes, unindexed: unindexed_shapes} = - Electric.ShapeCache.shape_counts(stack_id) - - Electric.Telemetry.OpenTelemetry.execute( - [:electric, :shapes, :total_shapes], - %{count: num_shapes, count_indexed: indexed_shapes, count_unindexed: unindexed_shapes}, - %{stack_id: stack_id} - ) + # Telemetry is started before everything else in the stack, so handle the case where the + # shape cache is not started yet: emit what we can and skip what we can't independently. + with %{total: num_shapes, indexed: indexed_shapes, unindexed: unindexed_shapes} <- + Electric.ShapeCache.shape_counts(stack_id) do + Electric.Telemetry.OpenTelemetry.execute( + [:electric, :shapes, :total_shapes], + %{count: num_shapes, count_indexed: indexed_shapes, count_unindexed: unindexed_shapes}, + %{stack_id: stack_id} + ) + end Electric.Telemetry.OpenTelemetry.execute( [:electric, :shapes, :active_shapes], @@ -70,34 +74,41 @@ defmodule Electric.StackSupervisor.Telemetry do @doc false @spec report_retained_wal_size(Electric.stack_id(), binary(), map()) :: :ok def report_retained_wal_size(stack_id, slot_name, _telemetry_opts) do - %Postgrex.Result{rows: [[pg_wal_offset, retained_wal_size, confirmed_flush_lsn_lag]]} = - Postgrex.query!( - Electric.Connection.Manager.admin_pool(stack_id), - @retained_wal_size_query, - [slot_name], - timeout: 3_000, - deadline: 3_000 - ) + try do + %Postgrex.Result{rows: [[pg_wal_offset, retained_wal_size, confirmed_flush_lsn_lag]]} = + Postgrex.query!( + Electric.Connection.Manager.admin_pool(stack_id), + @retained_wal_size_query, + [slot_name], + timeout: 3_000, + deadline: 3_000 + ) - # The query above can return `-1` for `confirmed_flush_lsn_lag` which means that Electric - # is caught up with Postgres' replication stream. - # This is a confusing stat if we're measuring in bytes, so use 0 as the bottom limit. + # The query above can return `-1` for `confirmed_flush_lsn_lag` which means that Electric + # is caught up with Postgres' replication stream. + # This is a confusing stat if we're measuring in bytes, so use 0 as the bottom limit. + + Electric.Telemetry.OpenTelemetry.execute( + [:electric, :postgres, :replication], + %{ + # The absolute value of pg_current_wal_lsn() doesn't convey any useful info but by + # plotting its rate of change we can see how fast the WAL is growing. + # + # We shift the absolute value of pg_current_wal_lsn() by -2**63 in the query above + # to make sure it fits inside the signed 64-bit integer type expected by the + # OpenTelemetry Protocol, + pg_wal_offset: pg_wal_offset, + slot_retained_wal_size: retained_wal_size, + slot_confirmed_flush_lsn_lag: max(0, confirmed_flush_lsn_lag) + }, + %{stack_id: stack_id} + ) + catch + kind, reason when kind in [:error, :exit] -> + Logger.debug("report_retained_wal_size/3 skipped: #{kind} #{inspect(reason)}") - Electric.Telemetry.OpenTelemetry.execute( - [:electric, :postgres, :replication], - %{ - # The absolute value of pg_current_wal_lsn() doesn't convey any useful info but by - # plotting its rate of change we can see how fast the WAL is growing. - # - # We shift the absolute value of pg_current_wal_lsn() by -2**63 in the query above - # to make sure it fits inside the signed 64-bit integer type expected by the - # OpenTelemetry Protocol, - pg_wal_offset: pg_wal_offset, - slot_retained_wal_size: retained_wal_size, - slot_confirmed_flush_lsn_lag: max(0, confirmed_flush_lsn_lag) - }, - %{stack_id: stack_id} - ) + :ok + end end if Code.ensure_loaded?(ElectricTelemetry.DiskUsage) do