From da0328692b94bb05bb8003717fa20bba19de94f0 Mon Sep 17 00:00:00 2001 From: Erik the Implementer Date: Wed, 22 Apr 2026 02:11:21 +0200 Subject: [PATCH 1/4] Propagate OTel context into spawned snapshot and move-in tasks `Task.Supervisor.async_nolink` / `start_child` start new Erlang processes that do not inherit the caller's OTel context. Spans created inside these tasks via `with_child_span` (e.g. `shape_snapshot.execute_for_shape` and its children) were therefore silently dropped on the initial-snapshot and move-in code paths, because `with_child_span` requires a parent span in the current process's context. Capture the context via `:otel_ctx.get_current()` before spawning and attach it inside the task closure with `:otel_ctx.attach/1`, mirroring the pattern already used for `state.otel_ctx` in the snapshotter's `handle_continue`. --- ...propagate-otel-context-to-snapshot-tasks.md | 5 +++++ .../electric/shapes/consumer/snapshotter.ex | 9 +++++++++ .../lib/electric/shapes/partial_modes.ex | 18 ++++++++++++++++++ 3 files changed, 32 insertions(+) create mode 100644 .changeset/propagate-otel-context-to-snapshot-tasks.md diff --git a/.changeset/propagate-otel-context-to-snapshot-tasks.md b/.changeset/propagate-otel-context-to-snapshot-tasks.md new file mode 100644 index 0000000000..c3761b52f9 --- /dev/null +++ b/.changeset/propagate-otel-context-to-snapshot-tasks.md @@ -0,0 +1,5 @@ +--- +'@core/sync-service': patch +--- + +Propagate OpenTelemetry context into spawned snapshot and move-in tasks so that spans created via `with_child_span` (e.g. `shape_snapshot.execute_for_shape`, `shape_snapshot.query_fn`, `shape_snapshot.checkout_wait`) are linked to the originating trace instead of being silently dropped. diff --git a/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex b/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex index 1b5872e7b5..8d0543b1d3 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex @@ -126,6 +126,11 @@ defmodule Electric.Shapes.Consumer.Snapshotter do # not an ephemeral unnamed task process db_pool = Electric.Connection.Manager.snapshot_pool(ctx.stack_id) + # Capture OTel context so spans created inside the spawned task are linked to + # the originating trace. OTel context is per-process, so without this any + # `with_child_span` calls in the task would be silently dropped. + otel_ctx = :otel_ctx.get_current() + # We're looking to avoid saturating the DB connection pool with queries that are "bad" - those that don't start # returning any data (likely because they're not using an index). To acheive that, we're running the query in a task, # and waiting for the task to (a) send us a message that it's ready to stream and (b) send us a message when it sees any data @@ -137,6 +142,8 @@ defmodule Electric.Shapes.Consumer.Snapshotter do task = Task.Supervisor.async_nolink(supervisor, fn -> + ctx_token = :otel_ctx.attach(otel_ctx) + snapshot_fun = Electric.StackConfig.lookup(stack_id, :create_snapshot_fn, &stream_snapshot_from_db/5) @@ -155,6 +162,8 @@ defmodule Electric.Shapes.Consumer.Snapshotter do rescue error -> {:error, error, __STACKTRACE__} + after + :otel_ctx.detach(ctx_token) end end) diff --git a/packages/sync-service/lib/electric/shapes/partial_modes.ex b/packages/sync-service/lib/electric/shapes/partial_modes.ex index 53482d7d41..221e8da5d2 100644 --- a/packages/sync-service/lib/electric/shapes/partial_modes.ex +++ b/packages/sync-service/lib/electric/shapes/partial_modes.ex @@ -95,7 +95,14 @@ defmodule Electric.Shapes.PartialModes do stack_id: opts[:stack_id] }) + # Propagate OTel context so spans created inside the task are linked to the + # caller's trace. OTel context is per-process, so without this any + # `with_child_span` calls in the task would be silently dropped. + otel_ctx = :otel_ctx.get_current() + Task.Supervisor.start_child(supervisor, fn -> + ctx_token = :otel_ctx.attach(otel_ctx) + try do SnapshotQuery.execute_for_shape(pool, shape_handle, shape, stack_id: opts[:stack_id], @@ -116,6 +123,8 @@ defmodule Electric.Shapes.PartialModes do rescue error -> send(consumer_pid, {:query_move_in_error, opts[:move_in_name], error, __STACKTRACE__}) + after + :otel_ctx.detach(ctx_token) end end) @@ -127,7 +136,14 @@ defmodule Electric.Shapes.PartialModes do pool = Manager.pool_name(opts[:stack_id], :snapshot) results_fn = Access.fetch!(opts, :results_fn) + # Propagate OTel context so spans created inside the task are linked to the + # caller's trace. OTel context is per-process, so without this any + # `with_child_span` calls in the task would be silently dropped. + otel_ctx = :otel_ctx.get_current() + Task.Supervisor.start_child(supervisor, fn -> + ctx_token = :otel_ctx.attach(otel_ctx) + try do SnapshotQuery.execute_for_shape(pool, shape_handle, shape, stack_id: opts[:stack_id], @@ -146,6 +162,8 @@ defmodule Electric.Shapes.PartialModes do rescue error -> send(parent, {:query_move_in_error, opts[:move_in_name], error, __STACKTRACE__}) + after + :otel_ctx.detach(ctx_token) end end) From 576c2140cdd98819f0cabf1107cc662e05b046c8 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 22 Apr 2026 02:17:56 +0200 Subject: [PATCH 2/4] use OpenTelemetry context helpers instead of raw :otel_ctx MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace :otel_ctx.get_current/attach/detach in PartialModes with OpenTelemetry.get_current_context/1 and set_current_context/1, matching the pattern already used in shape_log_collector.ex and consumer.ex. The helper pair just propagates the current span + baggage into the new process, which is all these short-lived tasks need — no detach dance required. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../lib/electric/shapes/partial_modes.ex | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/partial_modes.ex b/packages/sync-service/lib/electric/shapes/partial_modes.ex index 221e8da5d2..8a6f1f5109 100644 --- a/packages/sync-service/lib/electric/shapes/partial_modes.ex +++ b/packages/sync-service/lib/electric/shapes/partial_modes.ex @@ -98,10 +98,10 @@ defmodule Electric.Shapes.PartialModes do # Propagate OTel context so spans created inside the task are linked to the # caller's trace. OTel context is per-process, so without this any # `with_child_span` calls in the task would be silently dropped. - otel_ctx = :otel_ctx.get_current() + trace_context = OpenTelemetry.get_current_context() Task.Supervisor.start_child(supervisor, fn -> - ctx_token = :otel_ctx.attach(otel_ctx) + OpenTelemetry.set_current_context(trace_context) try do SnapshotQuery.execute_for_shape(pool, shape_handle, shape, @@ -123,8 +123,6 @@ defmodule Electric.Shapes.PartialModes do rescue error -> send(consumer_pid, {:query_move_in_error, opts[:move_in_name], error, __STACKTRACE__}) - after - :otel_ctx.detach(ctx_token) end end) @@ -139,10 +137,10 @@ defmodule Electric.Shapes.PartialModes do # Propagate OTel context so spans created inside the task are linked to the # caller's trace. OTel context is per-process, so without this any # `with_child_span` calls in the task would be silently dropped. - otel_ctx = :otel_ctx.get_current() + trace_context = OpenTelemetry.get_current_context() Task.Supervisor.start_child(supervisor, fn -> - ctx_token = :otel_ctx.attach(otel_ctx) + OpenTelemetry.set_current_context(trace_context) try do SnapshotQuery.execute_for_shape(pool, shape_handle, shape, @@ -162,8 +160,6 @@ defmodule Electric.Shapes.PartialModes do rescue error -> send(parent, {:query_move_in_error, opts[:move_in_name], error, __STACKTRACE__}) - after - :otel_ctx.detach(ctx_token) end end) From 0103b766c9a5f4dbc385cd70fec715fd103d3b5f Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 22 Apr 2026 02:21:50 +0200 Subject: [PATCH 3/4] replace :otel_ctx usage in Snapshotter and Shapes Follow the same pattern as the previous commit and shape_log_collector/consumer: use OpenTelemetry.get_current_context/1 and set_current_context/1 helpers instead of raw :otel_ctx. get_current/attach/detach. Drops the detach dance for both the handle_continue entry in Snapshotter and the nested Task in start_streaming_snapshot_from_db, and updates the producer in Shapes.get_or_create_shape_handle to capture the context via the same helper. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/sync-service/lib/electric/shapes.ex | 3 ++- .../lib/electric/shapes/consumer/snapshotter.ex | 10 +++------- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes.ex b/packages/sync-service/lib/electric/shapes.ex index 5156dfa9da..7720a166c6 100644 --- a/packages/sync-service/lib/electric/shapes.ex +++ b/packages/sync-service/lib/electric/shapes.ex @@ -4,6 +4,7 @@ defmodule Electric.Shapes do alias Electric.ShapeCache alias Electric.ShapeCache.ShapeStatus alias Electric.Shapes.Shape + alias Electric.Telemetry.OpenTelemetry import Electric, only: [is_stack_id: 1, is_shape_handle: 1] @@ -65,7 +66,7 @@ defmodule Electric.Shapes do ShapeCache.get_or_create_shape_handle( shape_def, stack_id, - otel_ctx: :otel_ctx.get_current() + otel_ctx: OpenTelemetry.get_current_context() ) end diff --git a/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex b/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex index 8d0543b1d3..4a4e43b220 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex @@ -42,7 +42,7 @@ defmodule Electric.Shapes.Consumer.Snapshotter do storage: storage } = state - ctx_token = if not is_nil(state.otel_ctx), do: :otel_ctx.attach(state.otel_ctx) + if not is_nil(state.otel_ctx), do: OpenTelemetry.set_current_context(state.otel_ctx) result = case Shapes.Consumer.whereis(stack_id, shape_handle) do @@ -104,8 +104,6 @@ defmodule Electric.Shapes.Consumer.Snapshotter do {:stop, {:error, "consumer not found"}, state} end - if not is_nil(ctx_token), do: :otel_ctx.detach(ctx_token) - result end @@ -129,7 +127,7 @@ defmodule Electric.Shapes.Consumer.Snapshotter do # Capture OTel context so spans created inside the spawned task are linked to # the originating trace. OTel context is per-process, so without this any # `with_child_span` calls in the task would be silently dropped. - otel_ctx = :otel_ctx.get_current() + trace_context = OpenTelemetry.get_current_context() # We're looking to avoid saturating the DB connection pool with queries that are "bad" - those that don't start # returning any data (likely because they're not using an index). To acheive that, we're running the query in a task, @@ -142,7 +140,7 @@ defmodule Electric.Shapes.Consumer.Snapshotter do task = Task.Supervisor.async_nolink(supervisor, fn -> - ctx_token = :otel_ctx.attach(otel_ctx) + OpenTelemetry.set_current_context(trace_context) snapshot_fun = Electric.StackConfig.lookup(stack_id, :create_snapshot_fn, &stream_snapshot_from_db/5) @@ -162,8 +160,6 @@ defmodule Electric.Shapes.Consumer.Snapshotter do rescue error -> {:error, error, __STACKTRACE__} - after - :otel_ctx.detach(ctx_token) end end) From b230e901b04ea76a0acbb1054ede3dd5ccd86fa8 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Wed, 22 Apr 2026 15:07:32 +0200 Subject: [PATCH 4/4] add OpenTelemetry.otel_ctx type and use it in initialize_shape_opts Follows up on the reviewer's suggestion: `get_current_context/0` returns a {span_ctx, baggage} tuple, not a map. Expose an `otel_ctx` @type on the OpenTelemetry module and reference it from `Consumer.initialize_shape_opts` so the spec matches the real shape of the value being carried. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/sync-service/lib/electric/shapes/consumer.ex | 2 +- .../sync-service/lib/electric/telemetry/open_telemetry.ex | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 2917bd57c0..b97eb88861 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -39,7 +39,7 @@ defmodule Electric.Shapes.Consumer do @type initialize_shape_opts() :: %{ :action => :create | :restore, - optional(:otel_ctx) => map() | nil, + optional(:otel_ctx) => OpenTelemetry.otel_ctx() | nil, optional(:feature_flags) => [binary()], optional(:is_subquery_shape?) => boolean() } diff --git a/packages/sync-service/lib/electric/telemetry/open_telemetry.ex b/packages/sync-service/lib/electric/telemetry/open_telemetry.ex index 523607075d..c2ecbaa46b 100644 --- a/packages/sync-service/lib/electric/telemetry/open_telemetry.ex +++ b/packages/sync-service/lib/electric/telemetry/open_telemetry.ex @@ -35,7 +35,12 @@ defmodule Electric.Telemetry.OpenTelemetry do @typep span_name :: String.t() @typep attr_name :: String.t() @typep span_attrs :: :opentelemetry.attributes_map() - @typep span_ctx :: :opentelemetry.span_ctx() + @type span_ctx :: :opentelemetry.span_ctx() + + @typedoc """ + Span + baggage pair returned by `get_current_context/0` and consumed by `set_current_context/1`. + """ + @type otel_ctx :: {span_ctx() | :undefined, :otel_baggage.t()} @doc """ Create a span that starts at the current point in time and ends when `fun` returns.