Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/propagate-otel-context-to-snapshot-tasks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@core/sync-service': patch
---

Propagate OpenTelemetry context into spawned snapshot and move-in tasks so that spans created via `with_child_span` (e.g. `shape_snapshot.execute_for_shape`, `shape_snapshot.query_fn`, `shape_snapshot.checkout_wait`) are linked to the originating trace instead of being silently dropped.
3 changes: 2 additions & 1 deletion packages/sync-service/lib/electric/shapes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule Electric.Shapes do
alias Electric.ShapeCache
alias Electric.ShapeCache.ShapeStatus
alias Electric.Shapes.Shape
alias Electric.Telemetry.OpenTelemetry

import Electric, only: [is_stack_id: 1, is_shape_handle: 1]

Expand Down Expand Up @@ -65,7 +66,7 @@ defmodule Electric.Shapes do
ShapeCache.get_or_create_shape_handle(
shape_def,
stack_id,
otel_ctx: :otel_ctx.get_current()
otel_ctx: OpenTelemetry.get_current_context()
)
end

Expand Down
2 changes: 1 addition & 1 deletion packages/sync-service/lib/electric/shapes/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ defmodule Electric.Shapes.Consumer do

@type initialize_shape_opts() :: %{
:action => :create | :restore,
optional(:otel_ctx) => map() | nil,
optional(:otel_ctx) => OpenTelemetry.otel_ctx() | nil,
optional(:feature_flags) => [binary()],
optional(:is_subquery_shape?) => boolean()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ defmodule Electric.Shapes.Consumer.Snapshotter do
storage: storage
} = state

ctx_token = if not is_nil(state.otel_ctx), do: :otel_ctx.attach(state.otel_ctx)
if not is_nil(state.otel_ctx), do: OpenTelemetry.set_current_context(state.otel_ctx)

result =
case Shapes.Consumer.whereis(stack_id, shape_handle) do
Expand Down Expand Up @@ -104,8 +104,6 @@ defmodule Electric.Shapes.Consumer.Snapshotter do
{:stop, {:error, "consumer not found"}, state}
end

if not is_nil(ctx_token), do: :otel_ctx.detach(ctx_token)

result
end

Expand All @@ -126,6 +124,11 @@ defmodule Electric.Shapes.Consumer.Snapshotter do
# not an ephemeral unnamed task process
db_pool = Electric.Connection.Manager.snapshot_pool(ctx.stack_id)

# Capture OTel context so spans created inside the spawned task are linked to
# the originating trace. OTel context is per-process, so without this any
# `with_child_span` calls in the task would be silently dropped.
trace_context = OpenTelemetry.get_current_context()

# We're looking to avoid saturating the DB connection pool with queries that are "bad" - those that don't start
# returning any data (likely because they're not using an index). To acheive that, we're running the query in a task,
# and waiting for the task to (a) send us a message that it's ready to stream and (b) send us a message when it sees any data
Expand All @@ -137,6 +140,8 @@ defmodule Electric.Shapes.Consumer.Snapshotter do

task =
Task.Supervisor.async_nolink(supervisor, fn ->
OpenTelemetry.set_current_context(trace_context)

snapshot_fun =
Electric.StackConfig.lookup(stack_id, :create_snapshot_fn, &stream_snapshot_from_db/5)

Expand Down
14 changes: 14 additions & 0 deletions packages/sync-service/lib/electric/shapes/partial_modes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,14 @@ defmodule Electric.Shapes.PartialModes do
stack_id: opts[:stack_id]
})

# Propagate OTel context so spans created inside the task are linked to the
# caller's trace. OTel context is per-process, so without this any
# `with_child_span` calls in the task would be silently dropped.
trace_context = OpenTelemetry.get_current_context()

Task.Supervisor.start_child(supervisor, fn ->
OpenTelemetry.set_current_context(trace_context)

try do
SnapshotQuery.execute_for_shape(pool, shape_handle, shape,
stack_id: opts[:stack_id],
Expand Down Expand Up @@ -127,7 +134,14 @@ defmodule Electric.Shapes.PartialModes do
pool = Manager.pool_name(opts[:stack_id], :snapshot)
results_fn = Access.fetch!(opts, :results_fn)

# Propagate OTel context so spans created inside the task are linked to the
# caller's trace. OTel context is per-process, so without this any
# `with_child_span` calls in the task would be silently dropped.
trace_context = OpenTelemetry.get_current_context()

Task.Supervisor.start_child(supervisor, fn ->
OpenTelemetry.set_current_context(trace_context)

try do
SnapshotQuery.execute_for_shape(pool, shape_handle, shape,
stack_id: opts[:stack_id],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@ defmodule Electric.Telemetry.OpenTelemetry do
@typep span_name :: String.t()
@typep attr_name :: String.t()
@typep span_attrs :: :opentelemetry.attributes_map()
@typep span_ctx :: :opentelemetry.span_ctx()
@type span_ctx :: :opentelemetry.span_ctx()

@typedoc """
Span + baggage pair returned by `get_current_context/0` and consumed by `set_current_context/1`.
"""
@type otel_ctx :: {span_ctx() | :undefined, :otel_baggage.t()}

@doc """
Create a span that starts at the current point in time and ends when `fun` returns.
Expand Down
Loading