diff --git a/.changeset/replication-introspection-error-handling.md b/.changeset/replication-introspection-error-handling.md new file mode 100644 index 0000000000..6f14abe326 --- /dev/null +++ b/.changeset/replication-introspection-error-handling.md @@ -0,0 +1,12 @@ +--- +"@core/sync-service": patch +--- + +Stop the ShapeLogCollector from crashing when table introspection fails on the hot replication path. + +Previously the replication pipeline only handled `{:error, :connection_not_available}` from the inspector; every other legal inspector return crashed the ShapeLogCollector and took the whole shapes supervision tree down with it, putting the replication client into a 10-minute noproc retry loop: + +- `:table_not_found` (table dropped or renamed between the WAL record and introspection) crashed `Partitions.handle_relation/2` and the pk-column lookup with a `CaseClauseError`. Relation messages for dropped tables are now ignored and changes for dropped tables are keyed on the full record, the same fallback used for tables without a primary key. +- In-band database errors (e.g. out-of-memory, `statement_timeout` cancelling the catalog query) crashed `Partitions.handle_relation/2` with a `CaseClauseError` and made `Partitions.add_shape/3` raise. They are now propagated as `{:error, reason}`; the collector logs a warning and pauses replication via the existing retry path until introspection succeeds. +- Connection-class errors returned in-band by Postgrex (e.g. `"ssl recv: closed"`) are now classified as `:connection_not_available` instead of leaking through as strings. +- Shape restore no longer crashes with a `MatchError` when the connection pool isn't ready yet; it retries the introspection in place and only gives up (with a descriptive error) after ~10 seconds. diff --git a/packages/sync-service/lib/electric/postgres/inspector/direct_inspector.ex b/packages/sync-service/lib/electric/postgres/inspector/direct_inspector.ex index 9f228057ac..f6e1b1d94d 100644 --- a/packages/sync-service/lib/electric/postgres/inspector/direct_inspector.ex +++ b/packages/sync-service/lib/electric/postgres/inspector/direct_inspector.ex @@ -95,10 +95,28 @@ defmodule Electric.Postgres.Inspector.DirectInspector do {:ok, relations} {:error, err} -> - {:error, Exception.message(err)} + normalize_query_error(err) end end + @doc false + # Convert an in-band query error into the inspector error contract. + # Connection-class errors (e.g. "ssl recv: closed") are returned by + # Postgrex as `{:error, %DBConnection.ConnectionError{}}` rather than + # raised, so without this they would bypass the `:connection_not_available` + # mapping that callers rely on for retries. + @spec normalize_query_error(Exception.t()) :: + {:error, String.t() | :connection_not_available} + def normalize_query_error(%DBConnection.ConnectionError{} = err) do + if Electric.DbConnectionError.from_error(err).type != :unknown do + {:error, :connection_not_available} + else + {:error, Exception.message(err)} + end + end + + def normalize_query_error(err), do: {:error, Exception.message(err)} + defp load_relation_query(match) do # partitions can live in other namespaces from the parent/root table, so we # need to keep track of them @@ -187,7 +205,7 @@ defmodule Electric.Postgres.Inspector.DirectInspector do {:ok, rows} {:error, err} -> - {:error, err} + normalize_query_error(err) end end diff --git a/packages/sync-service/lib/electric/replication/shape_log_collector.ex b/packages/sync-service/lib/electric/replication/shape_log_collector.ex index 463a7ed8cc..71c23050df 100644 --- a/packages/sync-service/lib/electric/replication/shape_log_collector.ex +++ b/packages/sync-service/lib/electric/replication/shape_log_collector.ex @@ -279,7 +279,7 @@ defmodule Electric.Replication.ShapeLogCollector do # we should skip this shape (and its children will also be skipped) case DependencyLayers.add_dependency(layers, shape, shape_handle) do {:ok, layers} -> - {:ok, partitions} = Partitions.add_shape(partitions, shape_handle, shape) + partitions = restore_partitions_for_shape(partitions, shape_handle, shape) { partitions, @@ -315,6 +315,37 @@ defmodule Electric.Replication.ShapeLogCollector do ) end + # Restoring a shape requires introspecting its root table, which can fail + # while the database connection pool is still coming up (or the database is + # otherwise unhealthy) — exactly the situation we're likely to be in when + # restarting after a crash. Skipping the shape is not an option: nothing + # re-registers restored shapes later, so a skipped shape would silently stop + # receiving updates. Instead retry in place and, if the error persists, + # crash with a descriptive error so the supervisor restarts the restore. + @restore_retry_delay_ms 100 + @restore_max_retries 100 + + defp restore_partitions_for_shape(partitions, shape_handle, shape, attempt \\ 1) do + case Partitions.add_shape(partitions, shape_handle, shape) do + {:ok, partitions} -> + partitions + + {:error, reason} when attempt >= @restore_max_retries -> + raise "Failed to restore partition info for shape #{shape_handle}: #{inspect(reason)}" + + {:error, reason} -> + if attempt == 1 do + Logger.warning( + "Retrying shape restore: failed to introspect #{Electric.Utils.inspect_relation(shape.root_table)}: #{inspect(reason)}", + shape_handle: shape_handle + ) + end + + Process.sleep(@restore_retry_delay_ms) + restore_partitions_for_shape(partitions, shape_handle, shape, attempt + 1) + end + end + def handle_call(:mark_as_ready, _from, state) do offset = case LsnTracker.get_last_processed_lsn(state.stack_id) do @@ -420,8 +451,8 @@ defmodule Electric.Replication.ShapeLogCollector do {state, Map.put(results, shape_handle, {:error, :missing_dependencies})} end - {:error, :connection_not_available} -> - {state, Map.put(results, shape_handle, {:error, :connection_not_available})} + {:error, reason} -> + {state, Map.put(results, shape_handle, {:error, reason})} end end) @@ -564,6 +595,18 @@ defmodule Electric.Replication.ShapeLogCollector do {:error, :connection_not_available} -> {{:error, :connection_not_available}, state} + + {:error, reason} -> + # Introspection failed for a reason other than the connection pool + # being down, e.g. the database returned an error to the catalog + # query. Reply with the one error the replication client knows how to + # recover from: it pauses the stream and redelivers the event, giving + # the introspection another chance instead of crashing the collector. + Logger.warning( + "Failed to introspect relations affected by transaction #{txn_fragment.xid}: #{inspect(reason)}. Replication is paused until introspection succeeds" + ) + + {{:error, :connection_not_available}, state} end end @@ -660,6 +703,18 @@ defmodule Electric.Replication.ShapeLogCollector do {:error, :connection_not_available} -> {{:error, :connection_not_available}, state} + + {:error, reason} -> + # Introspection failed for a reason other than the connection pool + # being down, e.g. the database returned an error to the catalog + # query. Reply with the one error the replication client knows how to + # recover from: it pauses the stream and redelivers the event, giving + # the introspection another chance instead of crashing the collector. + Logger.warning( + "Failed to introspect relation #{Electric.Utils.inspect_relation({updated_rel.schema, updated_rel.table})}: #{inspect(reason)}. Replication is paused until introspection succeeds" + ) + + {{:error, :connection_not_available}, state} end end @@ -799,6 +854,17 @@ defmodule Electric.Replication.ShapeLogCollector do with {:ok, {oid, _}} <- Inspector.load_relation_oid(relation, state.inspector), {:ok, info} <- Inspector.load_column_info(oid, state.inspector) do {:ok, Inspector.get_pk_cols(info)} + else + :table_not_found -> + # The table was dropped (or renamed) after these changes were written + # to the WAL, so its primary key can no longer be introspected. Key + # the changes on the full record — the same fallback used for tables + # without a primary key — and leave handling of the dropped table to + # the affected shapes further down the stack. + {:ok, []} + + {:error, reason} -> + {:error, reason} end end end diff --git a/packages/sync-service/lib/electric/shapes/partitions.ex b/packages/sync-service/lib/electric/shapes/partitions.ex index aa31b94e11..6af82f3818 100644 --- a/packages/sync-service/lib/electric/shapes/partitions.ex +++ b/packages/sync-service/lib/electric/shapes/partitions.ex @@ -36,7 +36,7 @@ defmodule Electric.Shapes.Partitions do partition root for every change to a partition of that root. """ @spec add_shape(t(), shape_id(), Electric.Shapes.Shape.t()) :: - {:ok, t()} | {:error, :connection_not_available} + {:ok, t()} | {:error, term()} def add_shape(%__MODULE__{} = state, shape_id, shape) do case Inspector.load_relation_info(shape.root_table_id, state.inspector) do {:ok, relation} -> @@ -69,13 +69,8 @@ defmodule Electric.Shapes.Partitions do # run a snapshot against a non-existent table) {:ok, state} - {:error, :connection_not_available} -> - {:error, :connection_not_available} - {:error, reason} -> - raise RuntimeError, - message: - "Unable to introspect table #{Electric.Utils.inspect_relation(shape.root_table)}: #{inspect(reason)}" + {:error, reason} end end @@ -119,7 +114,7 @@ defmodule Electric.Shapes.Partitions do Handle relation changes from the replication stream, expanding changes to partitions into the partition root as appropriate. """ - @spec handle_relation(t(), Relation.t()) :: {:ok, t()} | {:error, :connection_not_available} + @spec handle_relation(t(), Relation.t()) :: {:ok, t()} | {:error, term()} def handle_relation(%__MODULE__{} = state, %Relation{} = relation) do table = table(relation) @@ -138,8 +133,14 @@ defmodule Electric.Shapes.Partitions do {:ok, _} -> {:ok, state} - {:error, :connection_not_available} -> - {:error, :connection_not_available} + :table_not_found -> + # the table was dropped (or dropped and recreated under a new oid) + # between the relation message being written to the WAL and us + # introspecting it, so there are no partitions left to track + {:ok, state} + + {:error, reason} -> + {:error, reason} end end diff --git a/packages/sync-service/test/electric/postgres/inspector/direct_inspector_test.exs b/packages/sync-service/test/electric/postgres/inspector/direct_inspector_test.exs new file mode 100644 index 0000000000..5b85897ab3 --- /dev/null +++ b/packages/sync-service/test/electric/postgres/inspector/direct_inspector_test.exs @@ -0,0 +1,38 @@ +defmodule Electric.Postgres.Inspector.DirectInspectorTest do + use ExUnit.Case, async: true + + alias Electric.Postgres.Inspector.DirectInspector + + describe "normalize_query_error/1" do + test "classifies connection-class errors as :connection_not_available" do + for message <- ["ssl recv: closed", "tcp recv: closed", "ssl connect: closed"] do + error = %DBConnection.ConnectionError{message: message} + + assert {:error, :connection_not_available} = + DirectInspector.normalize_query_error(error) + end + end + + test "stringifies unknown connection errors" do + error = %DBConnection.ConnectionError{message: "something inexplicable"} + + assert {:error, "something inexplicable"} = DirectInspector.normalize_query_error(error) + end + + test "stringifies server-side errors" do + error = %Postgrex.Error{ + postgres: %{ + code: :out_of_memory, + pg_code: "53200", + message: "out of memory", + severity: "ERROR", + unknown: "ERROR" + } + } + + assert {:error, message} = DirectInspector.normalize_query_error(error) + assert message =~ "out of memory" + assert is_binary(message) + end + end +end diff --git a/packages/sync-service/test/electric/replication/shape_log_collector_test.exs b/packages/sync-service/test/electric/replication/shape_log_collector_test.exs index fb14346b77..1cc95beca4 100644 --- a/packages/sync-service/test/electric/replication/shape_log_collector_test.exs +++ b/packages/sync-service/test/electric/replication/shape_log_collector_test.exs @@ -117,6 +117,38 @@ defmodule Electric.Replication.ShapeLogCollectorTest do end end + describe "shape restoration with flaky introspection" do + setup _ctx do + # The collector restores shapes as soon as it starts inside + # setup_log_collector, so the inspector has to be patched before that + # setup runs. + attempts = :counters.new(1, []) + + patch_calls(Electric.Postgres.Inspector, [], + load_relation_info: fn 1234, _ -> + if :counters.get(attempts, 1) < 2 do + :counters.add(attempts, 1, 1) + {:error, :connection_not_available} + else + {:ok, %{id: 1234, schema: "public", name: "test_table", parent: nil, children: nil}} + end + end + ) + + :ok + end + + setup :setup_log_collector + + @tag capture_log: true + @tag restore_shapes: [{@shape_handle, @shape}], inspector: @inspector + test "retries introspection until the connection becomes available", ctx do + pid = ShapeLogCollector.name(ctx.stack_id) |> GenServer.whereis() + assert is_pid(pid) + assert Process.alive?(pid) + end + end + describe "shape restoration" do setup :setup_log_collector @@ -812,6 +844,58 @@ defmodule Electric.Replication.ShapeLogCollectorTest do assert {:error, :connection_not_available} = ShapeLogCollector.handle_event(txn, ctx.stack_id) end + + @tag capture_log: true + test "returns a retryable error and stays alive when column introspection fails unexpectedly", + ctx do + stub_inspector([force: true], + load_relation_oid: fn {"public", "test_table"}, _ -> + {:ok, {1234, {"public", "test_table"}}} + end, + load_column_info: fn 1234, _ -> + {:error, "ERROR 53200 (out_of_memory) out of memory"} + end + ) + + lsn = Lsn.from_integer(1) + log_offset = LogOffset.new(lsn, 0) + + txn = + complete_txn_fragment(100, lsn, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "2", "name" => "foo"}, + log_offset: log_offset + } + ]) + + assert {:error, :connection_not_available} = + ShapeLogCollector.handle_event(txn, ctx.stack_id) + + assert ShapeLogCollector.name(ctx.stack_id) |> GenServer.whereis() |> Process.alive?() + end + + test "processes the transaction when the table was dropped before introspection", ctx do + stub_inspector([force: true], + load_relation_oid: fn {"public", "test_table"}, _ -> :table_not_found end + ) + + lsn = Lsn.from_integer(1) + log_offset = LogOffset.new(lsn, 0) + + txn = + complete_txn_fragment(100, lsn, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "2", "name" => "foo"}, + log_offset: log_offset + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) + + assert ShapeLogCollector.name(ctx.stack_id) |> GenServer.whereis() |> Process.alive?() + end end describe "handle_event/2 with relations" do @@ -891,6 +975,24 @@ defmodule Electric.Replication.ShapeLogCollectorTest do Support.TransactionConsumer.assert_consume(ctx.consumers, [relation1, relation2]) end + @tag capture_log: true + test "returns a retryable error and stays alive when relation introspection fails unexpectedly", + ctx do + stub_inspector([force: true], + load_relation_info: fn 1234, _ -> + {:error, "ERROR 53200 (out_of_memory) out of memory"} + end, + clean: fn _, _ -> :ok end + ) + + relation = %Relation{id: 1234, table: "test_table", schema: "public", columns: []} + + assert {:error, :connection_not_available} = + ShapeLogCollector.handle_event(relation, ctx.stack_id) + + assert ShapeLogCollector.name(ctx.stack_id) |> GenServer.whereis() |> Process.alive?() + end + test "retries changed relation after partition inspection connection error", ctx do id = @shape.root_table_id {:ok, partition_relation_info_calls} = Agent.start_link(fn -> 0 end) @@ -1155,6 +1257,16 @@ defmodule Electric.Replication.ShapeLogCollectorTest do assert ShapeLogCollector.add_shape(ctx.stack_id, @shape_handle, @shape, :create) == {:error, :connection_not_available} end + + test "returns error when introspection fails unexpectedly", ctx do + error = "ERROR 53200 (out_of_memory) out of memory" + stub_inspector(load_relation_info: fn _, _ -> {:error, error} end) + + assert ShapeLogCollector.add_shape(ctx.stack_id, @shape_handle, @shape, :create) == + {:error, error} + + assert ShapeLogCollector.name(ctx.stack_id) |> GenServer.whereis() |> Process.alive?() + end end describe "handle_event/2 with shapes with dependencies" do diff --git a/packages/sync-service/test/electric/shapes/partitions_test.exs b/packages/sync-service/test/electric/shapes/partitions_test.exs index a8a97dc0db..435e4858f3 100644 --- a/packages/sync-service/test/electric/shapes/partitions_test.exs +++ b/packages/sync-service/test/electric/shapes/partitions_test.exs @@ -288,4 +288,32 @@ defmodule Electric.Shapes.PartitionsTest do Partitions.handle_txn_fragment(ctx.partitions, batch) end end + + describe "introspection failures" do + defmodule ConstInspector do + def load_relation_info(_oid, response), do: response + end + + test "handle_relation/2 ignores relations whose table no longer exists" do + partitions = Partitions.new(inspector: {ConstInspector, :table_not_found}) + relation = %Relation{id: 1, schema: "public", table: "dropped", columns: []} + + assert {:ok, %Partitions{}} = Partitions.handle_relation(partitions, relation) + end + + test "handle_relation/2 returns unexpected introspection errors" do + error = "ERROR 53200 (out_of_memory) out of memory" + partitions = Partitions.new(inspector: {ConstInspector, {:error, error}}) + relation = %Relation{id: 1, schema: "public", table: "a_table", columns: []} + + assert {:error, ^error} = Partitions.handle_relation(partitions, relation) + end + + test "add_shape/3 returns unexpected introspection errors instead of raising" do + shape = Shape.new!("partitioned", inspector: @inspector) + partitions = Partitions.new(inspector: {ConstInspector, {:error, "boom"}}) + + assert {:error, "boom"} = Partitions.add_shape(partitions, "s1", shape) + end + end end