Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changeset/tracestate-sample-rate.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
"@core/sync-service": patch
---

Honor an upstream head-sampling rate hint received via the W3C `tracestate` header (`electric=rate:<N>`) on shape GET requests:

- When the remote parent trace is sampled, the `Plug_shape_get` root span (and the `shape_get.plug.stream_chunk` child spans) are stamped with Honeycomb's `SampleRate` attribute — `N` for responses with status < 500 and `1` for 5xx responses — so Honeycomb weights aggregates over Electric's spans by the upstream sampling rate instead of under-reporting traffic ~N-fold.

- When the remote parent trace is NOT sampled and the request ends in a 5xx response, a single root request span is now synthesized and exported with `SampleRate=1` in the same trace as the upstream's spans (same trace_id, parented on the remote span), so server-side error telemetry is no longer lost to upstream head-sampling. Unsampled successful requests still export nothing.

Requests without a remote trace context, or with a missing/invalid rate hint, behave exactly as before.
64 changes: 63 additions & 1 deletion packages/sync-service/lib/electric/plug/serve_shape_plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ defmodule Electric.Plug.ServeShapePlug do

use Plug.Builder

alias Electric.Plug.TraceContextPlug
alias Electric.ShapeCache
alias Electric.ShapeCache.ShapeStatus
alias Electric.Shapes.Api
Expand Down Expand Up @@ -469,7 +470,68 @@ defmodule Electric.Plug.ServeShapePlug do
}
)

add_span_attrs_from_conn(conn)
conn
|> add_span_attrs_from_conn()
|> stamp_sample_rate()
end

# Stamp Honeycomb's `SampleRate` attribute on the root request span and handle the
# error tail of remote-unsampled traces.
#
# The upstream proxy head-samples successful requests at 1:N (propagated to us via the
# `tracestate: electric=rate:N` hint) and keeps all >= 500 responses at rate 1. We
# mirror that here, per request:
#
# * remote parent sampled: the root span is recording — stamp `SampleRate` = N for
# status < 500 and `SampleRate` = 1 for status >= 500, so Honeycomb weights
# aggregates over Electric spans the same way it weights the worker's;
#
# * remote parent NOT sampled: the parent-based sampler dropped all spans for this
# request, which is the volume win for the (vast) majority of successful requests.
# But for status >= 500 we still want server-side telemetry: synthesize a single
# root span carrying the final request attributes with `SampleRate` = 1, parented
# onto the remote span so it lands in the same trace as the upstream's
# kept-on-error spans;
#
# * no remote parent (direct traffic): nothing to do — spans are recorded and
# exported unweighted, as before.
defp stamp_sample_rate(conn) do
case TraceContextPlug.trace_context(conn) do
%{parent_sampled?: true} ->
case TraceContextPlug.sample_rate_attrs(conn, conn.status) do
attrs when map_size(attrs) > 0 -> OpenTelemetry.add_span_attributes(attrs)
_ -> :ok
end

%{parent_sampled?: false, parent_span_ctx: parent_span_ctx} ->
if is_integer(conn.status) and conn.status >= 500 do
export_unsampled_error_span(conn, parent_span_ctx)
end

nil ->
:ok
end

conn
end

# The request ran under a remote-unsampled trace (no recording span exists) but ended
# in a 5xx — export one root span after the fact so the error is visible server-side.
# `SampleRate` is hardcoded to 1: error responses ignore the rate hint, mirroring the
# upstream's keep-all-errors-at-rate-1 semantics.
defp export_unsampled_error_span(conn, parent_span_ctx) do
attributes =
conn
|> open_telemetry_attrs()
|> Map.put(TraceContextPlug.sample_rate_attr(), 1)

OpenTelemetry.export_unsampled_remote_span(
"Plug_shape_get",
attributes,
parent_span_ctx,
start_time: get_in(conn.private, [:electric_telemetry_span, :start_time]),
error: conn.assigns[:error_str] || "HTTP #{conn.status}"
)
end

defp get_handle(%{response: %{shape_handle: shape_handle}}), do: shape_handle
Expand Down
96 changes: 95 additions & 1 deletion packages/sync-service/lib/electric/plug/trace_context_plug.ex
Original file line number Diff line number Diff line change
@@ -1,10 +1,48 @@
defmodule Electric.Plug.TraceContextPlug do
@moduledoc """
A plug that extracts trace context from incoming HTTP headers and sets it as the parent span.

In addition to the standard W3C `traceparent` extraction, this plug parses Electric's
sample-rate hint from the `tracestate` header. An upstream proxy (e.g. the Cloudflare
worker in front of Electric Cloud) that head-samples requests at a rate of 1:N tells us
about that rate via a tracestate member of the form:

tracestate: electric=rate:<N>

The hint, together with the remote parent span context and its sampled flag, is stored
in the conn so that downstream plugs can:

* stamp Honeycomb's `SampleRate` attribute on exported spans, making Honeycomb weight
aggregates by the upstream sampling rate (see `sample_rate_attrs/2`);

* export an error-tail span for 5xx responses even when the remote parent was not
sampled (see `Electric.Telemetry.OpenTelemetry.export_unsampled_remote_span/4`).

Hints that are missing, unparseable or have a rate below 1 are ignored.
"""

@behaviour Plug

alias Electric.Telemetry.OpenTelemetry

@private_key :electric_trace_context
@tracestate_key "electric"
@sample_rate_attr "SampleRate"

@typedoc """
Remote trace context extracted from the request headers.

* `:parent_span_ctx` - the span context extracted from `traceparent`
* `:parent_sampled?` - whether the remote parent has the W3C `sampled` flag set
* `:sample_rate_hint` - the upstream 1:N sampling rate parsed from `tracestate`,
or `nil` when absent/invalid
"""
@type trace_context :: %{
parent_span_ctx: OpenTelemetry.span_ctx(),
parent_sampled?: boolean(),
sample_rate_hint: pos_integer() | nil
}

def init(opts), do: opts

def call(%Plug.Conn{req_headers: headers} = conn, _opts) do
Expand All @@ -21,7 +59,63 @@ defmodule Electric.Plug.TraceContextPlug do
span_ctx ->
# Parent found, set as current span
:otel_tracer.set_current_span(span_ctx)
conn

Plug.Conn.put_private(conn, @private_key, %{
parent_span_ctx: span_ctx,
parent_sampled?: OpenTelemetry.span_ctx_sampled?(span_ctx),
sample_rate_hint: sample_rate_hint(span_ctx)
})
end
end

@doc """
The remote trace context extracted from the request headers by this plug, or `nil`
when the request did not carry a (valid) `traceparent` header.
"""
@spec trace_context(Plug.Conn.t()) :: trace_context() | nil
def trace_context(%Plug.Conn{private: private}), do: private[@private_key]

@doc """
Span attributes carrying Honeycomb's `SampleRate` for a response with the given status.

Honeycomb's OTLP ingest natively reads an integer span attribute named `SampleRate`
and weights aggregates by it. Successful responses inherit the upstream sampling rate
from the tracestate hint, while error (>= 500) responses are stamped with a rate of 1:
they mirror the upstream's keep-all-errors-at-rate-1 semantics.

Returns an empty map when the request carried no usable rate hint, leaving the spans
unweighted as before.
"""
@spec sample_rate_attrs(Plug.Conn.t(), integer() | nil) :: %{
optional(String.t()) => pos_integer()
}
def sample_rate_attrs(conn, status) do
case trace_context(conn) do
%{sample_rate_hint: rate} when is_integer(rate) ->
%{@sample_rate_attr => effective_sample_rate(rate, status)}

_ ->
%{}
end
end

defp effective_sample_rate(_rate, status) when is_integer(status) and status >= 500, do: 1
defp effective_sample_rate(rate, _status), do: rate

@doc """
The name of the span attribute Honeycomb reads as the sampling weight.
"""
@spec sample_rate_attr() :: String.t()
def sample_rate_attr, do: @sample_rate_attr

defp sample_rate_hint(span_ctx) do
with value when is_binary(value) <-
OpenTelemetry.tracestate_value(span_ctx, @tracestate_key),
"rate:" <> rate_str <- value,
{rate, ""} when rate >= 1 <- Integer.parse(rate_str) do
rate
else
_ -> nil
end
end
end
8 changes: 7 additions & 1 deletion packages/sync-service/lib/electric/shapes/api/response.ex
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,12 @@ defmodule Electric.Shapes.Api.Response do
validate_response_finalized!(response)

stack_id = Api.stack_id(response)

# Per-chunk spans inherit Honeycomb's SampleRate from the upstream tracestate hint
# (when present), same as the root request span — see
# Electric.Plug.TraceContextPlug.sample_rate_attrs/2.
sample_rate_attrs = Electric.Plug.TraceContextPlug.sample_rate_attrs(conn, status)

conn = Plug.Conn.send_chunked(conn, status)

{conn, bytes_sent} =
Expand All @@ -436,7 +442,7 @@ defmodule Electric.Shapes.Api.Response do

OpenTelemetry.with_span(
"shape_get.plug.stream_chunk",
[chunk_size: chunk_size],
Map.put(sample_rate_attrs, "chunk_size", chunk_size),
stack_id,
fn ->
case Plug.Conn.chunk(conn, chunk) do
Expand Down
78 changes: 78 additions & 0 deletions packages/sync-service/lib/electric/telemetry/open_telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ defmodule Electric.Telemetry.OpenTelemetry do
alias Electric.Telemetry.Sampler
alias Electric.Telemetry.IntervalTimer

require Record

Record.defrecordp(
:span_ctx_record,
:span_ctx,
Record.extract(:span_ctx, from_lib: "opentelemetry_api/include/opentelemetry.hrl")
)

@typep span_name :: String.t()
@typep attr_name :: String.t()
@typep span_attrs :: :opentelemetry.attributes_map()
Expand Down Expand Up @@ -376,6 +384,76 @@ defmodule Electric.Telemetry.OpenTelemetry do
Process.get(@interval_timer_key, [])
end

@doc """
Whether the W3C `sampled` flag is set in the trace flags of the given span context.
"""
@spec span_ctx_sampled?(span_ctx()) :: boolean()
def span_ctx_sampled?(span_ctx) when Record.is_record(span_ctx, :span_ctx) do
Bitwise.band(span_ctx_record(span_ctx, :trace_flags), 1) == 1
end

@doc """
Look up the value of the member with the given key in the W3C tracestate carried
by the given span context.

Returns the member's value as a string, or `nil` when the tracestate has no such
member.
"""
@spec tracestate_value(span_ctx(), String.t()) :: String.t() | nil
def tracestate_value(span_ctx, key) when Record.is_record(span_ctx, :span_ctx) do
case :otel_tracestate.get(key, span_ctx_record(span_ctx, :tracestate)) do
value when is_binary(value) and value != "" -> value
[_ | _] = value -> List.to_string(value)
_ -> nil
end
end

@doc """
Create and immediately export a finished span as a child of a remote *unsampled*
parent span context.

The parent-based OTel sampler drops every span of a trace whose remote parent has
the `sampled` flag unset, so by the time we decide that a request is worth
exporting after all (e.g. it resulted in a 5xx response), no recording span exists.
This function retroactively synthesizes one: it copies the remote parent span
context with the `sampled` trace flag forced on and starts a recording span under
that copy, so the parent-based sampler takes its `remote_parent_sampled` branch
(`always_on` by default). The synthesized span keeps the remote trace_id and uses
the remote span id as its parent, so it shows up in the same trace as the spans
recorded by the upstream service.

Options:

* `:start_time` - native monotonic timestamp for the span's start (defaults to
the current time, producing a zero-duration span)
* `:error` - a message to record as the span's error status
"""
@spec export_unsampled_remote_span(span_name(), span_attrs(), span_ctx(), keyword()) :: :ok
def export_unsampled_remote_span(name, attributes, parent_span_ctx, opts \\ [])
when Record.is_record(parent_span_ctx, :span_ctx) do
trace_flags = span_ctx_record(parent_span_ctx, :trace_flags)
sampled_parent = span_ctx_record(parent_span_ctx, trace_flags: Bitwise.bor(trace_flags, 1))

parent_ctx = :otel_tracer.set_current_span(:otel_ctx.new(), sampled_parent)

span_opts = %{
attributes: Map.new(attributes),
links: [],
is_recording: true,
start_time: opts[:start_time] || :opentelemetry.timestamp(),
kind: :internal
}

span_ctx = :otel_tracer.start_span(parent_ctx, tracer(), name, span_opts)

if message = opts[:error] do
:otel_span.set_status(span_ctx, :error, message)
end

:otel_span.end_span(span_ctx)
:ok
end

@doc """
Add an error event to the current span.
"""
Expand Down
Loading
Loading