diff --git a/.changeset/telemetry-poller-safe-invoke.md b/.changeset/telemetry-poller-safe-invoke.md new file mode 100644 index 0000000000..288517fe47 --- /dev/null +++ b/.changeset/telemetry-poller-safe-invoke.md @@ -0,0 +1,8 @@ +--- +'@core/electric-telemetry': patch +'@core/sync-service': patch +--- + +Wrap telemetry-poller MFAs in `ElectricTelemetry.Poller.safe_invoke/3` so that transient collector failures (`:noproc`, `:timeout`, `:shutdown`/`:normal` exits, `ArgumentError` from not-yet-created ETS tables) no longer cause `:telemetry_poller` to permanently remove the measurement from its polling list. Unexpected errors are now logged as warnings with the offending MFA and the collector keeps being polled on subsequent ticks. Strips now-redundant defensive `try/catch` / `with`-fallthrough code from `count_shapes/2` and `report_retained_wal_size/3`. + +Note: user-supplied periodic measurement functions no longer have exceptions propagated up to `:telemetry_poller`'s own error logger — they are caught and logged via `ElectricTelemetry.Poller` instead. diff --git a/packages/electric-telemetry/lib/electric/telemetry/poller.ex b/packages/electric-telemetry/lib/electric/telemetry/poller.ex index 12607ac9d1..93af93c60c 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/poller.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/poller.ex @@ -1,4 +1,6 @@ defmodule ElectricTelemetry.Poller do + require Logger + @callback builtin_periodic_measurements(map) :: list() def child_spec(telemetry_opts, poller_opts) do @@ -25,19 +27,72 @@ defmodule ElectricTelemetry.Poller do def periodic_measurements(%{periodic_measurements: measurements} = telemetry_opts, module) do Enum.flat_map(measurements, fn - :builtin -> module.builtin_periodic_measurements(telemetry_opts) + :builtin -> + module.builtin_periodic_measurements(telemetry_opts) + # These are implemented by telemetry_poller - f when f in [:memory, :persistent_term, :system_counts, :total_run_queue_lengths] -> [f] + f when f in [:memory, :persistent_term, :system_counts, :total_run_queue_lengths] -> + [f] + # Bare function names are assumed to be referring to functions defined in the caller module - f when is_atom(f) -> {module, f, [telemetry_opts]} - f when is_function(f, 1) -> {__MODULE__, :user_measurement, [f, telemetry_opts]} - {m, f, a} when is_atom(m) and is_atom(f) and is_list(a) -> [{m, f, a ++ [telemetry_opts]}] + f when is_atom(f) -> + [wrap(module, f, [telemetry_opts])] + + f when is_function(f, 1) -> + [wrap(__MODULE__, :user_measurement, [f, telemetry_opts])] + + {m, f, a} when is_atom(m) and is_atom(f) and is_list(a) -> + [wrap(m, f, a ++ [telemetry_opts])] end) end def periodic_measurements(telemetry_opts, module), - do: module.builtin_periodic_measurements(telemetry_opts) + do: Enum.map(module.builtin_periodic_measurements(telemetry_opts), &wrap_mfa/1) + + defp wrap_mfa({m, f, a}), do: wrap(m, f, a) + defp wrap_mfa(other), do: other + + defp wrap(m, f, a), do: {__MODULE__, :safe_invoke, [m, f, a]} # Helper function to enable telemetry_poller to call a user-provided anonymous function def user_measurement(f, telemetry_opts), do: f.(telemetry_opts) + + @doc """ + Invoke a periodic measurement MFA, absorbing common failure modes. + + `:telemetry_poller` removes a measurement permanently from its polling list + after the first failure. Wrapping every MFA in `safe_invoke/3` prevents that: + transient errors (GenServer restart races, ETS tables not yet created, DB + unavailability) are logged as warnings and swallowed so the measurement keeps + being polled on subsequent ticks. + """ + def safe_invoke(m, f, a) do + apply(m, f, a) + :ok + rescue + ArgumentError -> + :ok + + e -> + Logger.warning( + "Telemetry collector #{inspect(m)}.#{f}/#{length(a)} crashed: " <> + Exception.message(e) + ) + + :ok + catch + :exit, {reason, _} when reason in [:noproc, :timeout, :shutdown, :normal] -> + :ok + + :exit, reason when reason in [:noproc, :shutdown, :normal] -> + :ok + + kind, reason -> + Logger.warning( + "Telemetry collector #{inspect(m)}.#{f}/#{length(a)} #{kind}: " <> + inspect(reason) + ) + + :ok + end end diff --git a/packages/electric-telemetry/test/electric/telemetry/poller_test.exs b/packages/electric-telemetry/test/electric/telemetry/poller_test.exs new file mode 100644 index 0000000000..db8363a7fb --- /dev/null +++ b/packages/electric-telemetry/test/electric/telemetry/poller_test.exs @@ -0,0 +1,115 @@ +defmodule ElectricTelemetry.PollerTest do + use ExUnit.Case, async: true + + import ExUnit.CaptureLog + + alias ElectricTelemetry.Poller + + defmodule Fixture do + def ok(), do: :done + def raise_argument(), do: raise(ArgumentError, "boom") + def raise_runtime(), do: raise(RuntimeError, "kaboom") + def exit_noproc(), do: exit({:noproc, {GenServer, :call, [:nowhere, :hi]}}) + def exit_timeout(), do: exit({:timeout, {GenServer, :call, [:slow, :hi]}}) + def exit_shutdown(), do: exit({:shutdown, :foo}) + def exit_normal_atom(), do: exit(:normal) + def exit_shutdown_atom(), do: exit(:shutdown) + def exit_weird(), do: exit(:weird) + def throw_it(), do: throw(:nope) + end + + describe "safe_invoke/3" do + test "returns :ok and runs the function on success" do + assert Poller.safe_invoke(Fixture, :ok, []) == :ok + end + + test "swallows ArgumentError (ETS missing, etc.) silently" do + log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :raise_argument, []) == :ok end) + refute log =~ "crashed" + end + + test "swallows generic exceptions with a warning" do + log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :raise_runtime, []) == :ok end) + assert log =~ "crashed" + assert log =~ "kaboom" + end + + test "swallows :noproc exit silently" do + log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :exit_noproc, []) == :ok end) + refute log =~ "exit" + end + + test "swallows :timeout exit silently" do + log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :exit_timeout, []) == :ok end) + refute log =~ "exit" + end + + test "swallows :shutdown exit silently" do + log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :exit_shutdown, []) == :ok end) + refute log =~ "exit" + end + + test "swallows bare :normal exit silently" do + log = + capture_log(fn -> assert Poller.safe_invoke(Fixture, :exit_normal_atom, []) == :ok end) + + refute log =~ "exit" + end + + test "swallows bare :shutdown exit silently" do + log = + capture_log(fn -> assert Poller.safe_invoke(Fixture, :exit_shutdown_atom, []) == :ok end) + + refute log =~ "exit" + end + + test "logs a warning for unexpected exits" do + log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :exit_weird, []) == :ok end) + assert log =~ "exit" + assert log =~ "weird" + end + + test "logs a warning for throws" do + log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :throw_it, []) == :ok end) + assert log =~ "throw" + end + end + + describe "periodic_measurements/2 wrapping" do + defmodule CallbackMod do + @behaviour ElectricTelemetry.Poller + def builtin_periodic_measurements(_opts), do: [] + def some_measurement(_opts), do: :ok + end + + test "wraps {m, f, a} tuples in safe_invoke" do + opts = %{periodic_measurements: [{CallbackMod, :some_measurement, []}]} + + assert [{ElectricTelemetry.Poller, :safe_invoke, [CallbackMod, :some_measurement, [_]]}] = + Poller.periodic_measurements(opts, CallbackMod) + end + + test "wraps bare function atoms in safe_invoke" do + opts = %{periodic_measurements: [:some_measurement]} + + assert [{ElectricTelemetry.Poller, :safe_invoke, [CallbackMod, :some_measurement, [_]]}] = + Poller.periodic_measurements(opts, CallbackMod) + end + + test "wraps anonymous functions in safe_invoke around user_measurement" do + f = fn _ -> :ok end + opts = %{periodic_measurements: [f]} + + assert [ + {ElectricTelemetry.Poller, :safe_invoke, + [ElectricTelemetry.Poller, :user_measurement, [^f, _]]} + ] = + Poller.periodic_measurements(opts, CallbackMod) + end + + test "leaves telemetry_poller builtins unwrapped" do + opts = %{periodic_measurements: [:memory, :persistent_term]} + assert Poller.periodic_measurements(opts, CallbackMod) == [:memory, :persistent_term] + end + end +end diff --git a/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex b/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex index ed74295ed5..1e8f1715a7 100644 --- a/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex +++ b/packages/sync-service/lib/electric/stack_supervisor/telemetry.ex @@ -29,8 +29,8 @@ defmodule Electric.StackSupervisor.Telemetry do end def count_shapes(stack_id, _telemetry_opts) do - # Telemetry is started before everything else in the stack, so we need to handle - # the case where the shape cache is not started yet. + # Telemetry is started before everything else in the stack, so handle the case where the + # shape cache is not started yet: emit what we can and skip what we can't independently. with %{total: num_shapes, indexed: indexed_shapes, unindexed: unindexed_shapes} <- Electric.ShapeCache.shape_counts(stack_id) do Electric.Telemetry.OpenTelemetry.execute( @@ -104,16 +104,10 @@ defmodule Electric.StackSupervisor.Telemetry do %{stack_id: stack_id} ) catch - :exit, {:noproc, _} -> - :ok + kind, reason when kind in [:error, :exit] -> + Logger.debug("report_retained_wal_size/3 skipped: #{kind} #{inspect(reason)}") - # catch all errors to not log them as errors, those are reporing issues at best - type, reason -> - Logger.warning( - "Failed to query retained WAL size\nError: #{Exception.format(type, reason)}", - stack_id: stack_id, - slot_name: slot_name - ) + :ok end end