Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/telemetry-poller-safe-invoke.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@core/electric-telemetry': patch
'@core/sync-service': patch
---

Wrap telemetry-poller MFAs in `ElectricTelemetry.Poller.safe_invoke/3` so that transient collector failures (`:noproc`, `:timeout`, `:shutdown`/`:normal` exits, `ArgumentError` from not-yet-created ETS tables) no longer cause `:telemetry_poller` to permanently remove the measurement from its polling list. Unexpected errors are now logged as warnings with the offending MFA and the collector keeps being polled on subsequent ticks. Strips now-redundant defensive `try/catch` / `with`-fallthrough code from `count_shapes/2` and `report_retained_wal_size/3`.

Note: user-supplied periodic measurement functions no longer have exceptions propagated up to `:telemetry_poller`'s own error logger — they are caught and logged via `ElectricTelemetry.Poller` instead.
67 changes: 61 additions & 6 deletions packages/electric-telemetry/lib/electric/telemetry/poller.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
defmodule ElectricTelemetry.Poller do
require Logger

@callback builtin_periodic_measurements(map) :: list()

def child_spec(telemetry_opts, poller_opts) do
Expand All @@ -25,19 +27,72 @@ defmodule ElectricTelemetry.Poller do

def periodic_measurements(%{periodic_measurements: measurements} = telemetry_opts, module) do
Enum.flat_map(measurements, fn
:builtin -> module.builtin_periodic_measurements(telemetry_opts)
:builtin ->
module.builtin_periodic_measurements(telemetry_opts)

# These are implemented by telemetry_poller
f when f in [:memory, :persistent_term, :system_counts, :total_run_queue_lengths] -> [f]
f when f in [:memory, :persistent_term, :system_counts, :total_run_queue_lengths] ->
[f]

# Bare function names are assumed to be referring to functions defined in the caller module
f when is_atom(f) -> {module, f, [telemetry_opts]}
f when is_function(f, 1) -> {__MODULE__, :user_measurement, [f, telemetry_opts]}
{m, f, a} when is_atom(m) and is_atom(f) and is_list(a) -> [{m, f, a ++ [telemetry_opts]}]
f when is_atom(f) ->
[wrap(module, f, [telemetry_opts])]

f when is_function(f, 1) ->
[wrap(__MODULE__, :user_measurement, [f, telemetry_opts])]

{m, f, a} when is_atom(m) and is_atom(f) and is_list(a) ->
[wrap(m, f, a ++ [telemetry_opts])]
end)
end

def periodic_measurements(telemetry_opts, module),
do: module.builtin_periodic_measurements(telemetry_opts)
do: Enum.map(module.builtin_periodic_measurements(telemetry_opts), &wrap_mfa/1)

defp wrap_mfa({m, f, a}), do: wrap(m, f, a)
defp wrap_mfa(other), do: other

defp wrap(m, f, a), do: {__MODULE__, :safe_invoke, [m, f, a]}

# Helper function to enable telemetry_poller to call a user-provided anonymous function
def user_measurement(f, telemetry_opts), do: f.(telemetry_opts)

@doc """
Invoke a periodic measurement MFA, absorbing common failure modes.

`:telemetry_poller` removes a measurement permanently from its polling list
after the first failure. Wrapping every MFA in `safe_invoke/3` prevents that:
transient errors (GenServer restart races, ETS tables not yet created, DB
unavailability) are logged as warnings and swallowed so the measurement keeps
being polled on subsequent ticks.
"""
def safe_invoke(m, f, a) do
apply(m, f, a)
:ok
rescue
ArgumentError ->
:ok

e ->
Logger.warning(
"Telemetry collector #{inspect(m)}.#{f}/#{length(a)} crashed: " <>
Exception.message(e)
)

:ok
catch
:exit, {reason, _} when reason in [:noproc, :timeout, :shutdown, :normal] ->
:ok

:exit, reason when reason in [:noproc, :shutdown, :normal] ->
:ok

kind, reason ->
Logger.warning(
"Telemetry collector #{inspect(m)}.#{f}/#{length(a)} #{kind}: " <>
inspect(reason)
)

:ok
end
end
115 changes: 115 additions & 0 deletions packages/electric-telemetry/test/electric/telemetry/poller_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
defmodule ElectricTelemetry.PollerTest do
use ExUnit.Case, async: true

import ExUnit.CaptureLog

alias ElectricTelemetry.Poller

defmodule Fixture do
def ok(), do: :done
def raise_argument(), do: raise(ArgumentError, "boom")
def raise_runtime(), do: raise(RuntimeError, "kaboom")
def exit_noproc(), do: exit({:noproc, {GenServer, :call, [:nowhere, :hi]}})
def exit_timeout(), do: exit({:timeout, {GenServer, :call, [:slow, :hi]}})
def exit_shutdown(), do: exit({:shutdown, :foo})
def exit_normal_atom(), do: exit(:normal)
def exit_shutdown_atom(), do: exit(:shutdown)
def exit_weird(), do: exit(:weird)
def throw_it(), do: throw(:nope)
end

describe "safe_invoke/3" do
test "returns :ok and runs the function on success" do
assert Poller.safe_invoke(Fixture, :ok, []) == :ok
end

test "swallows ArgumentError (ETS missing, etc.) silently" do
log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :raise_argument, []) == :ok end)
refute log =~ "crashed"
end

test "swallows generic exceptions with a warning" do
log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :raise_runtime, []) == :ok end)
assert log =~ "crashed"
assert log =~ "kaboom"
end

test "swallows :noproc exit silently" do
log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :exit_noproc, []) == :ok end)
refute log =~ "exit"
end

test "swallows :timeout exit silently" do
log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :exit_timeout, []) == :ok end)
refute log =~ "exit"
end

test "swallows :shutdown exit silently" do
log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :exit_shutdown, []) == :ok end)
refute log =~ "exit"
end

test "swallows bare :normal exit silently" do
log =
capture_log(fn -> assert Poller.safe_invoke(Fixture, :exit_normal_atom, []) == :ok end)

refute log =~ "exit"
end

test "swallows bare :shutdown exit silently" do
log =
capture_log(fn -> assert Poller.safe_invoke(Fixture, :exit_shutdown_atom, []) == :ok end)

refute log =~ "exit"
end

test "logs a warning for unexpected exits" do
log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :exit_weird, []) == :ok end)
assert log =~ "exit"
assert log =~ "weird"
end

test "logs a warning for throws" do
log = capture_log(fn -> assert Poller.safe_invoke(Fixture, :throw_it, []) == :ok end)
assert log =~ "throw"
end
end

describe "periodic_measurements/2 wrapping" do
defmodule CallbackMod do
@behaviour ElectricTelemetry.Poller
def builtin_periodic_measurements(_opts), do: []
def some_measurement(_opts), do: :ok
end

test "wraps {m, f, a} tuples in safe_invoke" do
opts = %{periodic_measurements: [{CallbackMod, :some_measurement, []}]}

assert [{ElectricTelemetry.Poller, :safe_invoke, [CallbackMod, :some_measurement, [_]]}] =
Poller.periodic_measurements(opts, CallbackMod)
end

test "wraps bare function atoms in safe_invoke" do
opts = %{periodic_measurements: [:some_measurement]}

assert [{ElectricTelemetry.Poller, :safe_invoke, [CallbackMod, :some_measurement, [_]]}] =
Poller.periodic_measurements(opts, CallbackMod)
end

test "wraps anonymous functions in safe_invoke around user_measurement" do
f = fn _ -> :ok end
opts = %{periodic_measurements: [f]}

assert [
{ElectricTelemetry.Poller, :safe_invoke,
[ElectricTelemetry.Poller, :user_measurement, [^f, _]]}
] =
Poller.periodic_measurements(opts, CallbackMod)
end

test "leaves telemetry_poller builtins unwrapped" do
opts = %{periodic_measurements: [:memory, :persistent_term]}
assert Poller.periodic_measurements(opts, CallbackMod) == [:memory, :persistent_term]
end
end
end
16 changes: 5 additions & 11 deletions packages/sync-service/lib/electric/stack_supervisor/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ defmodule Electric.StackSupervisor.Telemetry do
end

def count_shapes(stack_id, _telemetry_opts) do
# Telemetry is started before everything else in the stack, so we need to handle
# the case where the shape cache is not started yet.
# Telemetry is started before everything else in the stack, so handle the case where the
# shape cache is not started yet: emit what we can and skip what we can't independently.
with %{total: num_shapes, indexed: indexed_shapes, unindexed: unindexed_shapes} <-
Electric.ShapeCache.shape_counts(stack_id) do
Electric.Telemetry.OpenTelemetry.execute(
Expand Down Expand Up @@ -104,16 +104,10 @@ defmodule Electric.StackSupervisor.Telemetry do
%{stack_id: stack_id}
)
catch
:exit, {:noproc, _} ->
:ok
kind, reason when kind in [:error, :exit] ->
Logger.debug("report_retained_wal_size/3 skipped: #{kind} #{inspect(reason)}")

# catch all errors to not log them as errors, those are reporing issues at best
type, reason ->
Logger.warning(
"Failed to query retained WAL size\nError: #{Exception.format(type, reason)}",
stack_id: stack_id,
slot_name: slot_name
)
:ok
end
end

Expand Down
Loading