Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,8 @@ class OP:
COHERE_CHAT_COMPLETIONS_CREATE = "ai.chat_completions.create.cohere"
COHERE_EMBEDDINGS_CREATE = "ai.embeddings.create.cohere"
DB = "db"
DB_CURSOR_ITERATOR = "db.cursor.iter"
DB_CURSOR_FETCH = "db.cursor.fetch"
DB_REDIS = "db.redis"
EVENT_DJANGO = "event.django"
FUNCTION = "function"
Expand Down
16 changes: 14 additions & 2 deletions sentry_sdk/integrations/asyncpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@
)

try:
import asyncpg # type: ignore[import-not-found]
from asyncpg.cursor import BaseCursor # type: ignore
import asyncpg # type: ignore
from asyncpg.cursor import ( # type: ignore
BaseCursor,
Cursor,
CursorIterator,
)

except ImportError:
raise DidNotEnable("asyncpg not installed.")
Expand Down Expand Up @@ -169,6 +173,13 @@ async def _inner(*args: "Any", **kwargs: "Any") -> "T":
return await f(*args, **kwargs)

cursor = args[0]
if type(cursor) is CursorIterator:
span_op_override_value = OP.DB_CURSOR_ITERATOR
elif type(cursor) is Cursor:
span_op_override_value = OP.DB_CURSOR_FETCH
else:
span_op_override_value = None

query = _normalize_query(cursor._query)
with record_sql_queries(
cursor=cursor,
Expand All @@ -178,6 +189,7 @@ async def _inner(*args: "Any", **kwargs: "Any") -> "T":
executemany=False,
record_cursor_repr=True,
span_origin=AsyncPGIntegration.origin,
span_op_override_value=span_op_override_value,
) as span:
_set_db_data(span, cursor._connection)
res = await f(*args, **kwargs)
Expand Down
7 changes: 5 additions & 2 deletions sentry_sdk/tracing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def record_sql_queries(
executemany: bool,
record_cursor_repr: bool = False,
span_origin: str = "manual",
span_op_override_value: "Optional[str]" = None,
) -> "Generator[Union[sentry_sdk.tracing.Span, sentry_sdk.traces.StreamedSpan], None, None]":
# TODO: Bring back capturing of params by default
client = sentry_sdk.get_client()
Expand Down Expand Up @@ -167,13 +168,15 @@ def record_sql_queries(
name="<unknown SQL query>" if query is None else query,
attributes={
"sentry.origin": span_origin,
"sentry.op": OP.DB,
"sentry.op": span_op_override_value
if span_op_override_value
else OP.DB,
Comment on lines +171 to +173

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The check for span_op_override_value is inconsistent; one path uses a truthiness check while another uses is not None, causing different behavior for empty strings.
Severity: LOW

Suggested Fix

Standardize the conditional check for span_op_override_value in both the streaming and non-streaming code paths. Use the more explicit is not None check in both locations to ensure consistent behavior for all possible string values.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: sentry_sdk/tracing_utils.py#L171-L173

Potential issue: The function `record_sql_queries` checks the `span_op_override_value`
parameter inconsistently across two code paths. The streaming path uses a truthiness
check (`if span_op_override_value`), which would cause an empty string to be evaluated
as false, falling back to the default `OP.DB`. The non-streaming path uses an explicit
`is not None` check, which would correctly use an empty string as the operation name.
While no current callers pass an empty string, this inconsistency creates a latent bug
that could be triggered by future code changes, leading to incorrect span operation
names.

Also affects:

  • sentry_sdk/tracing_utils.py:179~179

Did we get this right? 👍 / 👎 to inform future reviews.

},
) as span:
yield span
else:
with sentry_sdk.start_span(
op=OP.DB,
op=span_op_override_value if span_op_override_value is not None else OP.DB,
name=query,
origin=span_origin,
) as span:
Expand Down
116 changes: 35 additions & 81 deletions tests/integrations/asyncpg/test_asyncpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import sentry_sdk
from sentry_sdk import capture_message, start_transaction
from sentry_sdk.consts import SPANDATA
from sentry_sdk.consts import OP, SPANDATA
from sentry_sdk.integrations.asyncpg import AsyncPGIntegration
from sentry_sdk.tracing_utils import record_sql_queries
from tests.conftest import ApproxDict
Expand Down Expand Up @@ -1361,75 +1361,57 @@ async def test_query_source_prepare(

@pytest.mark.asyncio
@pytest.mark.parametrize("span_streaming", [True, False])
async def test_cursor__bind_exec_creates_spans(
async def test_cursor_iteration_creates_db_cursor_iter_spans(
sentry_init, capture_events, capture_items, span_streaming
) -> None:
"""
Exercises the bind_exec patch through the iterator that's created in asyncpg when "for record in conn.cursor" is called.
See https://github.com/MagicStack/asyncpg/blob/db8ecc2a38e16fb0c090aef6f5506547c2831c24/asyncpg/cursor.py#L234
Regression test for https://github.com/getsentry/sentry-python/issues/6576

When iterating a server-side cursor with a small prefetch, asyncpg fetches
rows in batches. Each batch triggers BaseCursor._bind_exec (on first query) and
BaseCursor._exec (second query onwards) through CursorIterator.__anext__, which creates a
span with the same query description. The resulting burst of identical spans
causes Sentry's N+1 query detector to raise a false positive.

To mitigate, we set the "op"/"sentry.op" to `db.cursor.iter` instead of `db`
so that the sentry backend can exclude these spans from n+1 detection.
"""
sentry_init(
integrations=[AsyncPGIntegration()],
traces_sample_rate=1.0,
enable_db_query_source=True,
db_query_source_threshold_ms=0,
_experiments={
"trace_lifecycle": "stream" if span_streaming else "static",
},
)

if span_streaming:
items = capture_items("span")

with sentry_sdk.traces.start_span(name="test_segment"):
conn: Connection = await connect(PG_CONNECTION_URI)

await conn.executemany(
"INSERT INTO users(name, password, dob) VALUES($1, $2, $3)",
[
("Bob", "secret_pw", datetime.date(1984, 3, 1)),
("Alice", "pw", datetime.date(1990, 12, 25)),
],
[(f"user-{i}", "pw", datetime.date(1990, 1, 1)) for i in range(20)],
)

async with conn.transaction():
async for record in conn.cursor(
"SELECT * FROM users WHERE dob > $1",
datetime.date(1970, 1, 1),
):
async for _record in conn.cursor("SELECT * FROM users", prefetch=5):
pass

await conn.close()
sentry_sdk.flush()

spans = [item.payload for item in items]

assert len(spans) == 6

connect_span = spans[0]
executemany_span = spans[1]
begin_span = spans[2]
bind_exec_span = spans[3]
commit_span = spans[4]
segment = spans[5]
sentry_sdk.flush()

assert connect_span["name"] == "connect"
assert (
executemany_span["name"]
== "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)"
)
assert begin_span["name"] == "BEGIN;"
assert bind_exec_span["name"] == "SELECT * FROM users WHERE dob > $1"
assert commit_span["name"] == "COMMIT;"
assert segment["name"] == "test_segment"
cursor_iter_spans = [
item.payload
for item in items
if item.payload.get("name") == "SELECT * FROM users"
]

assert bind_exec_span["attributes"]["sentry.origin"] == "auto.db.asyncpg"
assert bind_exec_span["attributes"]["sentry.op"] == "db"
assert bind_exec_span["attributes"]["db.system.name"] == "postgresql"
assert bind_exec_span["attributes"]["db.driver.name"] == "asyncpg"
assert bind_exec_span["attributes"]["server.address"] == PG_HOST
assert bind_exec_span["attributes"]["server.port"] == PG_PORT
assert bind_exec_span["attributes"]["db.namespace"] == PG_NAME
assert bind_exec_span["attributes"]["db.user"] == PG_USER
assert len(cursor_iter_spans) == 5
for span in cursor_iter_spans:
assert span["attributes"]["sentry.op"] == OP.DB_CURSOR_ITERATOR
else:
events = capture_events()

Expand All @@ -1438,57 +1420,28 @@ async def test_cursor__bind_exec_creates_spans(

await conn.executemany(
"INSERT INTO users(name, password, dob) VALUES($1, $2, $3)",
[
("Bob", "secret_pw", datetime.date(1984, 3, 1)),
("Alice", "pw", datetime.date(1990, 12, 25)),
],
[(f"user-{i}", "pw", datetime.date(1990, 1, 1)) for i in range(20)],
)

async with conn.transaction():
async for record in conn.cursor(
"SELECT * FROM users WHERE dob > $1",
datetime.date(1970, 1, 1),
):
async for _record in conn.cursor("SELECT * FROM users", prefetch=5):
pass

await conn.close()

(event,) = events

assert len(event["spans"]) == 5

connect_span = event["spans"][0]
executemany_span = event["spans"][1]
begin_span = event["spans"][2]
bind_exec_span = event["spans"][3]
commit_span = event["spans"][4]
cursor_iter_spans = [
s for s in event["spans"] if s.get("description") == "SELECT * FROM users"
]

assert connect_span["description"] == "connect"
assert (
executemany_span["description"]
== "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)"
)
assert begin_span["description"] == "BEGIN;"
assert bind_exec_span["description"] == "SELECT * FROM users WHERE dob > $1"
assert commit_span["description"] == "COMMIT;"

assert bind_exec_span["origin"] == "auto.db.asyncpg"
assert bind_exec_span["data"]["db.system"] == "postgresql"
assert bind_exec_span["data"]["db.driver.name"] == "asyncpg"
assert bind_exec_span["data"]["server.address"] == PG_HOST
assert bind_exec_span["data"]["server.port"] == PG_PORT
assert bind_exec_span["data"]["db.name"] == PG_NAME
assert bind_exec_span["data"]["db.user"] == PG_USER

_assert_query_source(
bind_exec_span,
span_streaming,
"test_cursor__bind_exec_creates_spans",
)
assert len(cursor_iter_spans) == 5
for span in cursor_iter_spans:
assert span["op"] == OP.DB_CURSOR_ITERATOR


@pytest.mark.asyncio
async def test_cursor__exec_methods_create_spans(sentry_init, capture_events) -> None:
async def test_cursor_fetch_methods_create_spans(sentry_init, capture_events) -> None:
sentry_init(
integrations=[AsyncPGIntegration()],
traces_sample_rate=1.0,
Expand Down Expand Up @@ -1543,9 +1496,10 @@ async def test_cursor__exec_methods_create_spans(sentry_init, capture_events) ->
assert span["data"]["db.cursor"] is not None
assert span["data"]["db.system"] == "postgresql"
assert span["data"]["db.driver.name"] == "asyncpg"
assert span["op"] == OP.DB_CURSOR_FETCH
assert span["origin"] == "auto.db.asyncpg"
_assert_query_source(
span,
False,
"test_cursor__exec_methods_create_spans",
"test_cursor_fetch_methods_create_spans",
)
Loading