diff --git a/.changeset/propagate-otel-context-to-snapshot-tasks.md b/.changeset/propagate-otel-context-to-snapshot-tasks.md new file mode 100644 index 0000000000..c3761b52f9 --- /dev/null +++ b/.changeset/propagate-otel-context-to-snapshot-tasks.md @@ -0,0 +1,5 @@ +--- +'@core/sync-service': patch +--- + +Propagate OpenTelemetry context into spawned snapshot and move-in tasks so that spans created via `with_child_span` (e.g. `shape_snapshot.execute_for_shape`, `shape_snapshot.query_fn`, `shape_snapshot.checkout_wait`) are linked to the originating trace instead of being silently dropped. diff --git a/packages/sync-service/lib/electric/shapes.ex b/packages/sync-service/lib/electric/shapes.ex index 5156dfa9da..7720a166c6 100644 --- a/packages/sync-service/lib/electric/shapes.ex +++ b/packages/sync-service/lib/electric/shapes.ex @@ -4,6 +4,7 @@ defmodule Electric.Shapes do alias Electric.ShapeCache alias Electric.ShapeCache.ShapeStatus alias Electric.Shapes.Shape + alias Electric.Telemetry.OpenTelemetry import Electric, only: [is_stack_id: 1, is_shape_handle: 1] @@ -65,7 +66,7 @@ defmodule Electric.Shapes do ShapeCache.get_or_create_shape_handle( shape_def, stack_id, - otel_ctx: :otel_ctx.get_current() + otel_ctx: OpenTelemetry.get_current_context() ) end diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 2917bd57c0..b97eb88861 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -39,7 +39,7 @@ defmodule Electric.Shapes.Consumer do @type initialize_shape_opts() :: %{ :action => :create | :restore, - optional(:otel_ctx) => map() | nil, + optional(:otel_ctx) => OpenTelemetry.otel_ctx() | nil, optional(:feature_flags) => [binary()], optional(:is_subquery_shape?) => boolean() } diff --git a/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex b/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex index 1b5872e7b5..4a4e43b220 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex @@ -42,7 +42,7 @@ defmodule Electric.Shapes.Consumer.Snapshotter do storage: storage } = state - ctx_token = if not is_nil(state.otel_ctx), do: :otel_ctx.attach(state.otel_ctx) + if not is_nil(state.otel_ctx), do: OpenTelemetry.set_current_context(state.otel_ctx) result = case Shapes.Consumer.whereis(stack_id, shape_handle) do @@ -104,8 +104,6 @@ defmodule Electric.Shapes.Consumer.Snapshotter do {:stop, {:error, "consumer not found"}, state} end - if not is_nil(ctx_token), do: :otel_ctx.detach(ctx_token) - result end @@ -126,6 +124,11 @@ defmodule Electric.Shapes.Consumer.Snapshotter do # not an ephemeral unnamed task process db_pool = Electric.Connection.Manager.snapshot_pool(ctx.stack_id) + # Capture OTel context so spans created inside the spawned task are linked to + # the originating trace. OTel context is per-process, so without this any + # `with_child_span` calls in the task would be silently dropped. + trace_context = OpenTelemetry.get_current_context() + # We're looking to avoid saturating the DB connection pool with queries that are "bad" - those that don't start # returning any data (likely because they're not using an index). To acheive that, we're running the query in a task, # and waiting for the task to (a) send us a message that it's ready to stream and (b) send us a message when it sees any data @@ -137,6 +140,8 @@ defmodule Electric.Shapes.Consumer.Snapshotter do task = Task.Supervisor.async_nolink(supervisor, fn -> + OpenTelemetry.set_current_context(trace_context) + snapshot_fun = Electric.StackConfig.lookup(stack_id, :create_snapshot_fn, &stream_snapshot_from_db/5) diff --git a/packages/sync-service/lib/electric/shapes/partial_modes.ex b/packages/sync-service/lib/electric/shapes/partial_modes.ex index 53482d7d41..8a6f1f5109 100644 --- a/packages/sync-service/lib/electric/shapes/partial_modes.ex +++ b/packages/sync-service/lib/electric/shapes/partial_modes.ex @@ -95,7 +95,14 @@ defmodule Electric.Shapes.PartialModes do stack_id: opts[:stack_id] }) + # Propagate OTel context so spans created inside the task are linked to the + # caller's trace. OTel context is per-process, so without this any + # `with_child_span` calls in the task would be silently dropped. + trace_context = OpenTelemetry.get_current_context() + Task.Supervisor.start_child(supervisor, fn -> + OpenTelemetry.set_current_context(trace_context) + try do SnapshotQuery.execute_for_shape(pool, shape_handle, shape, stack_id: opts[:stack_id], @@ -127,7 +134,14 @@ defmodule Electric.Shapes.PartialModes do pool = Manager.pool_name(opts[:stack_id], :snapshot) results_fn = Access.fetch!(opts, :results_fn) + # Propagate OTel context so spans created inside the task are linked to the + # caller's trace. OTel context is per-process, so without this any + # `with_child_span` calls in the task would be silently dropped. + trace_context = OpenTelemetry.get_current_context() + Task.Supervisor.start_child(supervisor, fn -> + OpenTelemetry.set_current_context(trace_context) + try do SnapshotQuery.execute_for_shape(pool, shape_handle, shape, stack_id: opts[:stack_id], diff --git a/packages/sync-service/lib/electric/telemetry/open_telemetry.ex b/packages/sync-service/lib/electric/telemetry/open_telemetry.ex index 523607075d..c2ecbaa46b 100644 --- a/packages/sync-service/lib/electric/telemetry/open_telemetry.ex +++ b/packages/sync-service/lib/electric/telemetry/open_telemetry.ex @@ -35,7 +35,12 @@ defmodule Electric.Telemetry.OpenTelemetry do @typep span_name :: String.t() @typep attr_name :: String.t() @typep span_attrs :: :opentelemetry.attributes_map() - @typep span_ctx :: :opentelemetry.span_ctx() + @type span_ctx :: :opentelemetry.span_ctx() + + @typedoc """ + Span + baggage pair returned by `get_current_context/0` and consumed by `set_current_context/1`. + """ + @type otel_ctx :: {span_ctx() | :undefined, :otel_baggage.t()} @doc """ Create a span that starts at the current point in time and ends when `fun` returns.