From c8859cac9682cc4f6a4858ead52f85e282b4ff60 Mon Sep 17 00:00:00 2001 From: Diggory Blake Date: Thu, 21 May 2026 13:14:31 +0100 Subject: [PATCH] Open per-query tracing span in QueryLogger MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit QueryLogger now opens a hardcoded INFO-level `db.query` span in `new()` that closes on drop, with field names following the OTel database span semantic conventions (`db.system.name`, `db.query.text`, `db.response.returned_rows`, `db.response.affected_rows`, `otel.kind = "client"`). The existing close-time event is emitted inside the span via `Span::in_scope` so callers get both — the span for OTel correlation, the event for the existing `rows_affected`/`elapsed_secs` fields under whatever level `LogSettings` configures. `QueryLogger::new` keeps its old signature; drivers attach the system name via the additive `with_db_system_name` builder, which records the `db.system.name` attribute on the already-open span. QueryLogger stores `Span`, not `EnteredSpan`, so it stays `Send` across the postgres/mysql `try_stream!` paths that broke #3176. A small `InstrumentedStream` wrapper (using `pin-project-lite`, already present transitively via the async runtimes) enters the span on each `poll_next` and drops the guard before returning, so no guard is held across an await. SQLite's synchronous `Iterator` path just relies on the close-event in-scope emission. Design constraints come from abonander's 2026-04-14 review on #3313: current OTel semconv field names, no `tracing::enabled!()` (broken per tokio-rs/tracing#2448), hardcoded verbosity rather than runtime- configurable span levels. --- Cargo.lock | 1 + sqlx-core/Cargo.toml | 1 + sqlx-core/src/logger.rs | 315 ++++++++++++++++++++--- sqlx-mysql/src/connection/executor.rs | 12 +- sqlx-postgres/src/connection/executor.rs | 12 +- sqlx-sqlite/src/connection/execute.rs | 2 +- 6 files changed, 302 insertions(+), 41 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 66d1bd7a60..a23433e762 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3574,6 +3574,7 @@ dependencies = [ "memchr", "native-tls", "percent-encoding", + "pin-project-lite", "rust_decimal", "rustls", "rustls-native-certs", diff --git a/sqlx-core/Cargo.toml b/sqlx-core/Cargo.toml index c935f5dcc2..3133e34b07 100644 --- a/sqlx-core/Cargo.toml +++ b/sqlx-core/Cargo.toml @@ -88,6 +88,7 @@ futures-util = { version = "0.3.32", default-features = false, features = ["allo log = { version = "0.4.18", default-features = false } memchr = { version = "2.5.0", default-features = false } percent-encoding = "2.3.0" +pin-project-lite = "0.2.13" serde = { version = "1.0.219", features = ["derive", "rc"], optional = true } serde_json = { version = "1.0.142", features = ["raw_value"], optional = true } toml = { version = "0.8.16", optional = true } diff --git a/sqlx-core/src/logger.rs b/sqlx-core/src/logger.rs index 424639eddf..0cf094644b 100644 --- a/sqlx-core/src/logger.rs +++ b/sqlx-core/src/logger.rs @@ -1,6 +1,12 @@ use crate::{connection::LogSettings, sql_str::SqlStr}; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::time::Instant; +use futures_core::Stream; +use pin_project_lite::pin_project; +use tracing::Span; + // Yes these look silly. `tracing` doesn't currently support dynamic levels // https://github.com/tokio-rs/tracing/issues/372 #[doc(hidden)] @@ -66,19 +72,58 @@ pub struct QueryLogger { rows_affected: u64, start: Instant, settings: LogSettings, + span: Span, } impl QueryLogger { pub fn new(sql: SqlStr, settings: LogSettings) -> Self { + // Hardcoded INFO level per maintainer review of #3313: libraries should pick a + // level and let consumers filter via `EnvFilter`. Field names follow the OTel + // database span semantic conventions + // (https://opentelemetry.io/docs/specs/semconv/database/database-spans/). + // `otel.kind = "client"` is the magic field that `tracing-opentelemetry` reads + // to set the exported `SpanKind`. `db.system.name` is declared empty here and + // filled in by drivers via `with_db_system_name`, so adding the field doesn't + // force a signature break on `QueryLogger::new`. + let summary = parse_query_summary(sql.as_str()); + let operation = summary + .split_whitespace() + .next() + .map(str::to_owned) + .unwrap_or_default(); + let span = tracing::info_span!( + target: "sqlx::query", + "db.query", + "db.system.name" = tracing::field::Empty, + "db.operation.name" = operation, + "db.query.summary" = summary, + "db.query.text" = sql.as_str(), + "db.response.returned_rows" = tracing::field::Empty, + "db.response.affected_rows" = tracing::field::Empty, + "otel.kind" = "client", + ); + Self { sql, rows_returned: 0, rows_affected: 0, start: Instant::now(), settings, + span, } } + /// Records the OTel `db.system.name` attribute on the query span. + /// + /// Drivers should call this with their canonical OTel system identifier + /// (`"postgresql"`, `"mysql"`, `"sqlite"`, etc. — see the OTel database span + /// semantic conventions). Separate from `new` so adding the field doesn't break + /// callers that construct `QueryLogger` directly. + pub fn with_db_system_name(self, name: &'static str) -> Self { + self.span.record("db.system.name", name); + self + } + pub fn increment_rows_returned(&mut self) { self.rows_returned += 1; } @@ -91,9 +136,25 @@ impl QueryLogger { &self.sql } + /// Clone the span attached to this query. + /// + /// Use with [`InstrumentedStream`] (or `Future::instrument` for plain futures) to + /// attribute child events emitted during query execution to the query's span. The + /// `Span` is `Send`; never store an `EnteredSpan` here (see #3176). + pub fn span(&self) -> Span { + self.span.clone() + } + pub fn finish(&self) { let elapsed = self.start.elapsed(); + // Record the per-query result counts on the span before it closes so OTel + // exporters see them as span attributes. + self.span + .record("db.response.returned_rows", self.rows_returned); + self.span + .record("db.response.affected_rows", self.rows_affected); + let was_slow = elapsed >= self.settings.slow_statements_duration; let lvl = if was_slow { @@ -117,38 +178,43 @@ impl QueryLogger { String::new() }; - if was_slow { - private_tracing_dynamic_event!( - target: "sqlx::query", - tracing_level, - summary, - db.statement = sql, - rows_affected = self.rows_affected, - rows_returned = self.rows_returned, - // Human-friendly - includes units (usually ms). Also kept for backward compatibility - ?elapsed, - // Search friendly - numeric - elapsed_secs = elapsed.as_secs_f64(), - // When logging to JSON, one can trigger alerts from the presence of this field. - slow_threshold=?self.settings.slow_statements_duration, - // Make sure to use "slow" in the message as that's likely - // what people will grep for. - "slow statement: execution time exceeded alert threshold" - ); - } else { - private_tracing_dynamic_event!( - target: "sqlx::query", - tracing_level, - summary, - db.statement = sql, - rows_affected = self.rows_affected, - rows_returned = self.rows_returned, - // Human-friendly - includes units (usually ms). Also kept for backward compatibility - ?elapsed, - // Search friendly - numeric - elapsed_secs = elapsed.as_secs_f64(), - ); - } + // Emit the existing close-time event inside the query span so consumers + // see both the span (for OTel correlation) and the event (for the + // backwards-compatible `rows_affected`/`elapsed_secs` fields). + self.span.in_scope(|| { + if was_slow { + private_tracing_dynamic_event!( + target: "sqlx::query", + tracing_level, + summary, + db.statement = sql, + rows_affected = self.rows_affected, + rows_returned = self.rows_returned, + // Human-friendly - includes units (usually ms). Also kept for backward compatibility + ?elapsed, + // Search friendly - numeric + elapsed_secs = elapsed.as_secs_f64(), + // When logging to JSON, one can trigger alerts from the presence of this field. + slow_threshold=?self.settings.slow_statements_duration, + // Make sure to use "slow" in the message as that's likely + // what people will grep for. + "slow statement: execution time exceeded alert threshold" + ); + } else { + private_tracing_dynamic_event!( + target: "sqlx::query", + tracing_level, + summary, + db.statement = sql, + rows_affected = self.rows_affected, + rows_returned = self.rows_returned, + // Human-friendly - includes units (usually ms). Also kept for backward compatibility + ?elapsed, + // Search friendly - numeric + elapsed_secs = elapsed.as_secs_f64(), + ); + } + }); } } } @@ -160,6 +226,37 @@ impl Drop for QueryLogger { } } +pin_project! { + /// Wraps a [`Stream`] so each `poll_next` runs inside the given [`Span`]. + /// + /// This is the `Stream` counterpart to `tracing::Instrument` for futures. It + /// re-enters the span on every poll and drops the guard before yielding, so no + /// `EnteredSpan` is ever held across an await point — fixing the `!Send` issue + /// that sank #3176. The inner stream is projected via `pin-project-lite`, so this + /// adds no allocation and keeps the module free of `unsafe` pin code. + pub struct InstrumentedStream { + #[pin] + inner: S, + span: Span, + } +} + +impl InstrumentedStream { + pub fn new(inner: S, span: Span) -> Self { + Self { inner, span } + } +} + +impl Stream for InstrumentedStream { + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + let _enter = this.span.enter(); + this.inner.poll_next(cx) + } +} + pub fn parse_query_summary(sql: &str) -> String { // For now, just take the first 4 words sql.split_whitespace() @@ -167,3 +264,157 @@ pub fn parse_query_summary(sql: &str) -> String { .collect::>() .join(" ") } + +#[cfg(test)] +mod tests { + use super::*; + use crate::sql_str::SqlSafeStr; + use std::sync::{Arc, Mutex}; + use tracing::field::{Field, Visit}; + use tracing::span::{Attributes, Record}; + use tracing::subscriber::{with_default, Subscriber}; + use tracing::{Event, Id, Metadata}; + + struct CapturedSpan { + name: &'static str, + target: String, + level: tracing::Level, + fields: std::collections::HashMap, + closed: bool, + contained_events: usize, + } + + #[derive(Default)] + struct CaptureSubscriber { + next_id: std::sync::atomic::AtomicU64, + spans: Mutex>, + current: Mutex>, + } + + struct StringVisitor<'a>(&'a mut std::collections::HashMap); + impl Visit for StringVisitor<'_> { + fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) { + self.0 + .insert(field.name().to_string(), format!("{value:?}")); + } + fn record_str(&mut self, field: &Field, value: &str) { + self.0.insert(field.name().to_string(), value.to_string()); + } + fn record_u64(&mut self, field: &Field, value: u64) { + self.0.insert(field.name().to_string(), value.to_string()); + } + fn record_i64(&mut self, field: &Field, value: i64) { + self.0.insert(field.name().to_string(), value.to_string()); + } + fn record_bool(&mut self, field: &Field, value: bool) { + self.0.insert(field.name().to_string(), value.to_string()); + } + } + + impl Subscriber for CaptureSubscriber { + fn enabled(&self, _metadata: &Metadata<'_>) -> bool { + true + } + fn new_span(&self, attrs: &Attributes<'_>) -> Id { + let id = self + .next_id + .fetch_add(1, std::sync::atomic::Ordering::SeqCst) + + 1; + let mut span = CapturedSpan { + name: attrs.metadata().name(), + target: attrs.metadata().target().to_string(), + level: *attrs.metadata().level(), + fields: std::collections::HashMap::new(), + closed: false, + contained_events: 0, + }; + attrs.record(&mut StringVisitor(&mut span.fields)); + self.spans.lock().unwrap().insert(id, span); + Id::from_u64(id) + } + fn record(&self, span: &Id, values: &Record<'_>) { + if let Some(s) = self.spans.lock().unwrap().get_mut(&span.into_u64()) { + values.record(&mut StringVisitor(&mut s.fields)); + } + } + fn record_follows_from(&self, _: &Id, _: &Id) {} + fn event(&self, _event: &Event<'_>) { + let current = self.current.lock().unwrap(); + if let Some(&id) = current.last() { + if let Some(s) = self.spans.lock().unwrap().get_mut(&id) { + s.contained_events += 1; + } + } + } + fn enter(&self, span: &Id) { + self.current.lock().unwrap().push(span.into_u64()); + } + fn exit(&self, _span: &Id) { + self.current.lock().unwrap().pop(); + } + fn try_close(&self, id: Id) -> bool { + if let Some(s) = self.spans.lock().unwrap().get_mut(&id.into_u64()) { + s.closed = true; + } + true + } + } + + #[test] + fn query_logger_opens_and_closes_span_with_expected_fields() { + let subscriber = Arc::new(CaptureSubscriber::default()); + with_default(subscriber.clone(), || { + let settings = LogSettings::default(); + let sql = "SELECT id, name FROM users WHERE id = 1".into_sql_str(); + let mut logger = QueryLogger::new(sql, settings).with_db_system_name("postgresql"); + logger.increment_rows_returned(); + logger.increment_rows_returned(); + logger.increase_rows_affected(2); + drop(logger); + }); + + let spans = subscriber.spans.lock().unwrap(); + assert_eq!(spans.len(), 1, "exactly one span should be opened"); + let span = spans.values().next().unwrap(); + + assert_eq!(span.name, "db.query"); + assert_eq!(span.target, "sqlx::query"); + assert_eq!(span.level, tracing::Level::INFO); + assert!(span.closed, "span must close on QueryLogger drop"); + assert!( + span.contained_events >= 1, + "the close-time event should fire inside the span" + ); + + assert_eq!( + span.fields.get("db.system.name").map(String::as_str), + Some("postgresql") + ); + assert_eq!( + span.fields.get("db.operation.name").map(String::as_str), + Some("SELECT") + ); + assert_eq!( + span.fields.get("otel.kind").map(String::as_str), + Some("client") + ); + assert!(span + .fields + .get("db.query.text") + .is_some_and(|s| s.contains("SELECT id, name FROM users"))); + assert_eq!( + span.fields + .get("db.response.returned_rows") + .map(String::as_str), + Some("2"), + "rows_returned must be recorded on the span before close" + ); + assert_eq!( + span.fields + .get("db.response.affected_rows") + .map(String::as_str), + Some("2"), + "rows_affected must be recorded on the span before close" + ); + } +} diff --git a/sqlx-mysql/src/connection/executor.rs b/sqlx-mysql/src/connection/executor.rs index ee59d03d0a..2234e6aff0 100644 --- a/sqlx-mysql/src/connection/executor.rs +++ b/sqlx-mysql/src/connection/executor.rs @@ -4,7 +4,7 @@ use crate::error::Error; use crate::executor::{Execute, Executor}; use crate::ext::ustr::UStr; use crate::io::MySqlBufExt; -use crate::logger::QueryLogger; +use crate::logger::{InstrumentedStream, QueryLogger}; use crate::protocol::response::Status; use crate::protocol::statement::{ BinaryRow, Execute as StatementExecute, Prepare, PrepareOk, StmtClose, @@ -107,12 +107,14 @@ impl MySqlConnection { persistent: bool, ) -> Result, Error>> + 'e, Error> { - let mut logger = QueryLogger::new(sql, self.inner.log_settings.clone()); + let mut logger = + QueryLogger::new(sql, self.inner.log_settings.clone()).with_db_system_name("mysql"); + let span = logger.span(); self.inner.stream.wait_until_ready().await?; self.inner.stream.waiting.push_back(Waiting::Result); - Ok(try_stream! { + let stream = try_stream! { let sql = logger.sql().as_str(); // make a slot for the shared column data @@ -266,7 +268,9 @@ impl MySqlConnection { r#yield!(v); } } - }) + }; + + Ok(InstrumentedStream::new(stream, span)) } } diff --git a/sqlx-postgres/src/connection/executor.rs b/sqlx-postgres/src/connection/executor.rs index e0f4c3d44a..8c0170bbb9 100644 --- a/sqlx-postgres/src/connection/executor.rs +++ b/sqlx-postgres/src/connection/executor.rs @@ -1,7 +1,7 @@ use crate::error::Error; use crate::executor::{Execute, Executor}; use crate::io::{PortalId, StatementId}; -use crate::logger::QueryLogger; +use crate::logger::{InstrumentedStream, QueryLogger}; use crate::message::{ self, BackendMessageFormat, Bind, Close, CommandComplete, DataRow, ParameterDescription, Parse, ParseComplete, RowDescription, @@ -203,7 +203,9 @@ impl PgConnection { persistent: bool, metadata_opt: Option>, ) -> Result, Error>> + 'e, Error> { - let mut logger = QueryLogger::new(query, self.inner.log_settings.clone()); + let mut logger = QueryLogger::new(query, self.inner.log_settings.clone()) + .with_db_system_name("postgresql"); + let span = logger.span(); let sql = logger.sql().as_str(); // before we continue, wait until we are "ready" to accept more queries @@ -292,7 +294,7 @@ impl PgConnection { self.inner.stream.flush().await?; - Ok(try_stream! { + let stream = try_stream! { loop { let message = self.inner.stream.recv().await?; @@ -372,7 +374,9 @@ impl PgConnection { } Ok(()) - }) + }; + + Ok(InstrumentedStream::new(stream, span)) } } diff --git a/sqlx-sqlite/src/connection/execute.rs b/sqlx-sqlite/src/connection/execute.rs index 733a1abbe6..58768458e9 100644 --- a/sqlx-sqlite/src/connection/execute.rs +++ b/sqlx-sqlite/src/connection/execute.rs @@ -29,7 +29,7 @@ pub(crate) fn iter( // fetch the cached statement or allocate a new one let statement = conn.statements.get(query.as_str(), persistent)?; - let logger = QueryLogger::new(query, conn.log_settings.clone()); + let logger = QueryLogger::new(query, conn.log_settings.clone()).with_db_system_name("sqlite"); Ok(ExecuteIter { handle: &mut conn.handle,