Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .changeset/replication-introspection-error-handling.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
"@core/sync-service": patch
---

Stop the ShapeLogCollector from crashing when table introspection fails on the hot replication path.

Previously the replication pipeline only handled `{:error, :connection_not_available}` from the inspector; every other legal inspector return crashed the ShapeLogCollector and took the whole shapes supervision tree down with it, putting the replication client into a 10-minute noproc retry loop:

- `:table_not_found` (table dropped or renamed between the WAL record and introspection) crashed `Partitions.handle_relation/2` and the pk-column lookup with a `CaseClauseError`. Relation messages for dropped tables are now ignored and changes for dropped tables are keyed on the full record, the same fallback used for tables without a primary key.
- In-band database errors (e.g. out-of-memory, `statement_timeout` cancelling the catalog query) crashed `Partitions.handle_relation/2` with a `CaseClauseError` and made `Partitions.add_shape/3` raise. They are now propagated as `{:error, reason}`; the collector logs a warning and pauses replication via the existing retry path until introspection succeeds.
- Connection-class errors returned in-band by Postgrex (e.g. `"ssl recv: closed"`) are now classified as `:connection_not_available` instead of leaking through as strings.
- Shape restore no longer crashes with a `MatchError` when the connection pool isn't ready yet; it retries the introspection in place and only gives up (with a descriptive error) after ~10 seconds.
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,28 @@ defmodule Electric.Postgres.Inspector.DirectInspector do
{:ok, relations}

{:error, err} ->
{:error, Exception.message(err)}
normalize_query_error(err)
end
end

@doc false
# Convert an in-band query error into the inspector error contract.
# Connection-class errors (e.g. "ssl recv: closed") are returned by
# Postgrex as `{:error, %DBConnection.ConnectionError{}}` rather than
# raised, so without this they would bypass the `:connection_not_available`
# mapping that callers rely on for retries.
@spec normalize_query_error(Exception.t()) ::
{:error, String.t() | :connection_not_available}
def normalize_query_error(%DBConnection.ConnectionError{} = err) do
if Electric.DbConnectionError.from_error(err).type != :unknown do
{:error, :connection_not_available}
else
{:error, Exception.message(err)}
end
end

def normalize_query_error(err), do: {:error, Exception.message(err)}

defp load_relation_query(match) do
# partitions can live in other namespaces from the parent/root table, so we
# need to keep track of them
Expand Down Expand Up @@ -187,7 +205,7 @@ defmodule Electric.Postgres.Inspector.DirectInspector do
{:ok, rows}

{:error, err} ->
{:error, err}
normalize_query_error(err)
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ defmodule Electric.Replication.ShapeLogCollector do
# we should skip this shape (and its children will also be skipped)
case DependencyLayers.add_dependency(layers, shape, shape_handle) do
{:ok, layers} ->
{:ok, partitions} = Partitions.add_shape(partitions, shape_handle, shape)
partitions = restore_partitions_for_shape(partitions, shape_handle, shape)

{
partitions,
Expand Down Expand Up @@ -315,6 +315,37 @@ defmodule Electric.Replication.ShapeLogCollector do
)
end

# Restoring a shape requires introspecting its root table, which can fail
# while the database connection pool is still coming up (or the database is
# otherwise unhealthy) — exactly the situation we're likely to be in when
# restarting after a crash. Skipping the shape is not an option: nothing
# re-registers restored shapes later, so a skipped shape would silently stop
# receiving updates. Instead retry in place and, if the error persists,
# crash with a descriptive error so the supervisor restarts the restore.
@restore_retry_delay_ms 100
@restore_max_retries 100

defp restore_partitions_for_shape(partitions, shape_handle, shape, attempt \\ 1) do
case Partitions.add_shape(partitions, shape_handle, shape) do
{:ok, partitions} ->
partitions

{:error, reason} when attempt >= @restore_max_retries ->
raise "Failed to restore partition info for shape #{shape_handle}: #{inspect(reason)}"

{:error, reason} ->
if attempt == 1 do
Logger.warning(
"Retrying shape restore: failed to introspect #{Electric.Utils.inspect_relation(shape.root_table)}: #{inspect(reason)}",
shape_handle: shape_handle
)
end

Process.sleep(@restore_retry_delay_ms)
restore_partitions_for_shape(partitions, shape_handle, shape, attempt + 1)
end
end

def handle_call(:mark_as_ready, _from, state) do
offset =
case LsnTracker.get_last_processed_lsn(state.stack_id) do
Expand Down Expand Up @@ -420,8 +451,8 @@ defmodule Electric.Replication.ShapeLogCollector do
{state, Map.put(results, shape_handle, {:error, :missing_dependencies})}
end

{:error, :connection_not_available} ->
{state, Map.put(results, shape_handle, {:error, :connection_not_available})}
{:error, reason} ->
{state, Map.put(results, shape_handle, {:error, reason})}
end
end)

Expand Down Expand Up @@ -564,6 +595,18 @@ defmodule Electric.Replication.ShapeLogCollector do

{:error, :connection_not_available} ->
{{:error, :connection_not_available}, state}

{:error, reason} ->
# Introspection failed for a reason other than the connection pool
# being down, e.g. the database returned an error to the catalog
# query. Reply with the one error the replication client knows how to
# recover from: it pauses the stream and redelivers the event, giving
# the introspection another chance instead of crashing the collector.
Logger.warning(
"Failed to introspect relations affected by transaction #{txn_fragment.xid}: #{inspect(reason)}. Replication is paused until introspection succeeds"
)

{{:error, :connection_not_available}, state}
end
end

Expand Down Expand Up @@ -660,6 +703,18 @@ defmodule Electric.Replication.ShapeLogCollector do

{:error, :connection_not_available} ->
{{:error, :connection_not_available}, state}

{:error, reason} ->
# Introspection failed for a reason other than the connection pool
# being down, e.g. the database returned an error to the catalog
# query. Reply with the one error the replication client knows how to
# recover from: it pauses the stream and redelivers the event, giving
# the introspection another chance instead of crashing the collector.
Logger.warning(
"Failed to introspect relation #{Electric.Utils.inspect_relation({updated_rel.schema, updated_rel.table})}: #{inspect(reason)}. Replication is paused until introspection succeeds"
)

{{:error, :connection_not_available}, state}
end
end

Expand Down Expand Up @@ -799,6 +854,17 @@ defmodule Electric.Replication.ShapeLogCollector do
with {:ok, {oid, _}} <- Inspector.load_relation_oid(relation, state.inspector),
{:ok, info} <- Inspector.load_column_info(oid, state.inspector) do
{:ok, Inspector.get_pk_cols(info)}
else
:table_not_found ->
# The table was dropped (or renamed) after these changes were written
# to the WAL, so its primary key can no longer be introspected. Key
# the changes on the full record — the same fallback used for tables
# without a primary key — and leave handling of the dropped table to
# the affected shapes further down the stack.
{:ok, []}

{:error, reason} ->
{:error, reason}
end
end
end
21 changes: 11 additions & 10 deletions packages/sync-service/lib/electric/shapes/partitions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ defmodule Electric.Shapes.Partitions do
partition root for every change to a partition of that root.
"""
@spec add_shape(t(), shape_id(), Electric.Shapes.Shape.t()) ::
{:ok, t()} | {:error, :connection_not_available}
{:ok, t()} | {:error, term()}
def add_shape(%__MODULE__{} = state, shape_id, shape) do
case Inspector.load_relation_info(shape.root_table_id, state.inspector) do
{:ok, relation} ->
Expand Down Expand Up @@ -69,13 +69,8 @@ defmodule Electric.Shapes.Partitions do
# run a snapshot against a non-existent table)
{:ok, state}

{:error, :connection_not_available} ->
{:error, :connection_not_available}

{:error, reason} ->
raise RuntimeError,
message:
"Unable to introspect table #{Electric.Utils.inspect_relation(shape.root_table)}: #{inspect(reason)}"
{:error, reason}
end
end

Expand Down Expand Up @@ -119,7 +114,7 @@ defmodule Electric.Shapes.Partitions do
Handle relation changes from the replication stream,
expanding changes to partitions into the partition root as appropriate.
"""
@spec handle_relation(t(), Relation.t()) :: {:ok, t()} | {:error, :connection_not_available}
@spec handle_relation(t(), Relation.t()) :: {:ok, t()} | {:error, term()}
def handle_relation(%__MODULE__{} = state, %Relation{} = relation) do
table = table(relation)

Expand All @@ -138,8 +133,14 @@ defmodule Electric.Shapes.Partitions do
{:ok, _} ->
{:ok, state}

{:error, :connection_not_available} ->
{:error, :connection_not_available}
:table_not_found ->
# the table was dropped (or dropped and recreated under a new oid)
# between the relation message being written to the WAL and us
# introspecting it, so there are no partitions left to track
{:ok, state}

{:error, reason} ->
{:error, reason}
end
end

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
defmodule Electric.Postgres.Inspector.DirectInspectorTest do
use ExUnit.Case, async: true

alias Electric.Postgres.Inspector.DirectInspector

describe "normalize_query_error/1" do
test "classifies connection-class errors as :connection_not_available" do
for message <- ["ssl recv: closed", "tcp recv: closed", "ssl connect: closed"] do
error = %DBConnection.ConnectionError{message: message}

assert {:error, :connection_not_available} =
DirectInspector.normalize_query_error(error)
end
end

test "stringifies unknown connection errors" do
error = %DBConnection.ConnectionError{message: "something inexplicable"}

assert {:error, "something inexplicable"} = DirectInspector.normalize_query_error(error)
end

test "stringifies server-side errors" do
error = %Postgrex.Error{
postgres: %{
code: :out_of_memory,
pg_code: "53200",
message: "out of memory",
severity: "ERROR",
unknown: "ERROR"
}
}

assert {:error, message} = DirectInspector.normalize_query_error(error)
assert message =~ "out of memory"
assert is_binary(message)
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,38 @@ defmodule Electric.Replication.ShapeLogCollectorTest do
end
end

describe "shape restoration with flaky introspection" do
setup _ctx do
# The collector restores shapes as soon as it starts inside
# setup_log_collector, so the inspector has to be patched before that
# setup runs.
attempts = :counters.new(1, [])

patch_calls(Electric.Postgres.Inspector, [],
load_relation_info: fn 1234, _ ->
if :counters.get(attempts, 1) < 2 do
:counters.add(attempts, 1, 1)
{:error, :connection_not_available}
else
{:ok, %{id: 1234, schema: "public", name: "test_table", parent: nil, children: nil}}
end
end
)

:ok
end

setup :setup_log_collector

@tag capture_log: true
@tag restore_shapes: [{@shape_handle, @shape}], inspector: @inspector
test "retries introspection until the connection becomes available", ctx do
pid = ShapeLogCollector.name(ctx.stack_id) |> GenServer.whereis()
assert is_pid(pid)
assert Process.alive?(pid)
end
end

describe "shape restoration" do
setup :setup_log_collector

Expand Down Expand Up @@ -812,6 +844,58 @@ defmodule Electric.Replication.ShapeLogCollectorTest do
assert {:error, :connection_not_available} =
ShapeLogCollector.handle_event(txn, ctx.stack_id)
end

@tag capture_log: true
test "returns a retryable error and stays alive when column introspection fails unexpectedly",
ctx do
stub_inspector([force: true],
load_relation_oid: fn {"public", "test_table"}, _ ->
{:ok, {1234, {"public", "test_table"}}}
end,
load_column_info: fn 1234, _ ->
{:error, "ERROR 53200 (out_of_memory) out of memory"}
end
)

lsn = Lsn.from_integer(1)
log_offset = LogOffset.new(lsn, 0)

txn =
complete_txn_fragment(100, lsn, [
%Changes.NewRecord{
relation: {"public", "test_table"},
record: %{"id" => "2", "name" => "foo"},
log_offset: log_offset
}
])

assert {:error, :connection_not_available} =
ShapeLogCollector.handle_event(txn, ctx.stack_id)

assert ShapeLogCollector.name(ctx.stack_id) |> GenServer.whereis() |> Process.alive?()
end

test "processes the transaction when the table was dropped before introspection", ctx do
stub_inspector([force: true],
load_relation_oid: fn {"public", "test_table"}, _ -> :table_not_found end
)

lsn = Lsn.from_integer(1)
log_offset = LogOffset.new(lsn, 0)

txn =
complete_txn_fragment(100, lsn, [
%Changes.NewRecord{
relation: {"public", "test_table"},
record: %{"id" => "2", "name" => "foo"},
log_offset: log_offset
}
])

assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id)

assert ShapeLogCollector.name(ctx.stack_id) |> GenServer.whereis() |> Process.alive?()
end
end

describe "handle_event/2 with relations" do
Expand Down Expand Up @@ -891,6 +975,24 @@ defmodule Electric.Replication.ShapeLogCollectorTest do
Support.TransactionConsumer.assert_consume(ctx.consumers, [relation1, relation2])
end

@tag capture_log: true
test "returns a retryable error and stays alive when relation introspection fails unexpectedly",
ctx do
stub_inspector([force: true],
load_relation_info: fn 1234, _ ->
{:error, "ERROR 53200 (out_of_memory) out of memory"}
end,
clean: fn _, _ -> :ok end
)

relation = %Relation{id: 1234, table: "test_table", schema: "public", columns: []}

assert {:error, :connection_not_available} =
ShapeLogCollector.handle_event(relation, ctx.stack_id)

assert ShapeLogCollector.name(ctx.stack_id) |> GenServer.whereis() |> Process.alive?()
end

test "retries changed relation after partition inspection connection error", ctx do
id = @shape.root_table_id
{:ok, partition_relation_info_calls} = Agent.start_link(fn -> 0 end)
Expand Down Expand Up @@ -1155,6 +1257,16 @@ defmodule Electric.Replication.ShapeLogCollectorTest do
assert ShapeLogCollector.add_shape(ctx.stack_id, @shape_handle, @shape, :create) ==
{:error, :connection_not_available}
end

test "returns error when introspection fails unexpectedly", ctx do
error = "ERROR 53200 (out_of_memory) out of memory"
stub_inspector(load_relation_info: fn _, _ -> {:error, error} end)

assert ShapeLogCollector.add_shape(ctx.stack_id, @shape_handle, @shape, :create) ==
{:error, error}

assert ShapeLogCollector.name(ctx.stack_id) |> GenServer.whereis() |> Process.alive?()
end
end

describe "handle_event/2 with shapes with dependencies" do
Expand Down
Loading
Loading