Skip to content

Commit 3260ec9

Browse files
committed
PYTHON-5676 Consolidate per-command execution
- Consolidate per-command execution into a single path — a shared _network_command_core in network.py that both network.command() and cursor run_operation() flow through, plus a _CommandTelemetry context manager (pymongo/_telemetry.py) centralizing command debug logging only (APM publishing stays inline). - Remove the dead legacy OP_QUERY path and the higher-level command/write_command/unack_write methods from AsyncConnection in favor of free functions in network.py (including bulk write paths), migrating all call sites.
1 parent 079e329 commit 3260ec9

30 files changed

Lines changed: 1532 additions & 1941 deletions

pymongo/_telemetry.py

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
# Copyright 2025-present MongoDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Unified per-command telemetry: logging, monitoring, and OpenTelemetry.
16+
17+
Currently wires the command logging channel; APM event publishing and
18+
OpenTelemetry spans are layered on top of :class:`_CommandTelemetry`.
19+
"""
20+
from __future__ import annotations
21+
22+
import datetime
23+
import logging
24+
from typing import Any, Mapping, Optional
25+
26+
from pymongo.errors import NotPrimaryError, OperationFailure
27+
from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log
28+
from pymongo.message import _convert_exception
29+
30+
31+
class _CommandTelemetry:
32+
"""Context manager for per-command telemetry.
33+
34+
Currently wires the command *logging* channel only; the name reflects the
35+
intended scope as the APM and OpenTelemetry channels are added on top.
36+
37+
Logs the ``STARTED`` event on entry, then ``SUCCEEDED`` or ``FAILED`` once
38+
the outcome is known. Call :meth:`handle_succeeded` with the server reply
39+
on success or :meth:`handle_failed` with the raised exception on error; if
40+
an exception propagates out of the ``with`` block without either being
41+
called, the ``FAILED`` event is logged automatically from ``__exit__``.
42+
43+
This consolidates command *logging* only -- APM event publishing remains
44+
at the call site. The context manager owns the duration clock (from the
45+
``start`` time passed in) and exposes it via :attr:`duration`, and stores
46+
the computed failure document on :attr:`failure`, so callers can reuse both
47+
for APM events. A future change can extend this class to publish monitoring
48+
(and OpenTelemetry) events alongside logging.
49+
50+
Usage::
51+
52+
with _CommandTelemetry(client, conn, cmd, dbname, request_id, start) as cmd_telemetry:
53+
reply = do_network_call()
54+
duration = cmd_telemetry.handle_succeeded(reply)
55+
# Failures are logged automatically in __exit__.
56+
"""
57+
58+
__slots__ = (
59+
"_client",
60+
"_conn",
61+
"_spec",
62+
"_dbname",
63+
"_request_id",
64+
"_operation_id",
65+
"_log",
66+
"_start",
67+
"duration",
68+
"failure",
69+
"_handled",
70+
)
71+
72+
def __init__(
73+
self,
74+
client: Any,
75+
conn: Any,
76+
spec: Mapping[str, Any],
77+
dbname: str,
78+
request_id: int,
79+
start: datetime.datetime,
80+
log: bool = True,
81+
operation_id: Optional[int] = None,
82+
) -> None:
83+
self._client = client
84+
self._conn = conn
85+
self._spec = spec
86+
self._dbname = dbname
87+
self._request_id = request_id
88+
self._operation_id = request_id if operation_id is None else operation_id
89+
# Whether command debug-logging is wanted for this command. Passed
90+
# explicitly by the caller rather than inferred from ``client``.
91+
self._log = log
92+
self._start = start
93+
self.duration: Optional[datetime.timedelta] = None
94+
self.failure: Any = None
95+
self._handled = False
96+
97+
def _enabled(self) -> bool:
98+
return self._log and _COMMAND_LOGGER.isEnabledFor(logging.DEBUG)
99+
100+
def __enter__(self) -> _CommandTelemetry:
101+
if self._enabled():
102+
_debug_log(
103+
_COMMAND_LOGGER,
104+
message=_CommandStatusMessage.STARTED,
105+
clientId=self._client._topology_settings._topology_id,
106+
command=self._spec,
107+
commandName=next(iter(self._spec)),
108+
databaseName=self._dbname,
109+
requestId=self._request_id,
110+
operationId=self._operation_id,
111+
driverConnectionId=self._conn.id,
112+
serverConnectionId=self._conn.server_connection_id,
113+
serverHost=self._conn.address[0],
114+
serverPort=self._conn.address[1],
115+
serviceId=self._conn.service_id,
116+
)
117+
return self
118+
119+
def handle_succeeded(
120+
self,
121+
reply: Any,
122+
speculative_hello: bool = False,
123+
) -> datetime.timedelta:
124+
"""Log the ``SUCCEEDED`` event and return the elapsed duration."""
125+
self.duration = datetime.datetime.now() - self._start
126+
if self._enabled():
127+
_debug_log(
128+
_COMMAND_LOGGER,
129+
message=_CommandStatusMessage.SUCCEEDED,
130+
clientId=self._client._topology_settings._topology_id,
131+
durationMS=self.duration,
132+
reply=reply,
133+
commandName=next(iter(self._spec)),
134+
databaseName=self._dbname,
135+
requestId=self._request_id,
136+
operationId=self._operation_id,
137+
driverConnectionId=self._conn.id,
138+
serverConnectionId=self._conn.server_connection_id,
139+
serverHost=self._conn.address[0],
140+
serverPort=self._conn.address[1],
141+
serviceId=self._conn.service_id,
142+
speculative_authenticate=speculative_hello,
143+
)
144+
self._handled = True
145+
return self.duration
146+
147+
def handle_failed(
148+
self,
149+
exc: BaseException,
150+
failure: Optional[Any] = None,
151+
is_server_side_error: Optional[bool] = None,
152+
) -> datetime.timedelta:
153+
"""Log the ``FAILED`` event and return the elapsed duration.
154+
155+
The failure document and server-side-error flag are derived from *exc*
156+
for the common case. Callers that must transform the failure document
157+
(e.g. unacknowledged bulk writes) pass *failure* explicitly. The
158+
computed failure is stored on :attr:`failure` for reuse by APM events.
159+
"""
160+
self.duration = datetime.datetime.now() - self._start
161+
if failure is None:
162+
if isinstance(exc, (NotPrimaryError, OperationFailure)):
163+
failure = exc.details
164+
else:
165+
failure = _convert_exception(exc) # type: ignore[arg-type]
166+
if is_server_side_error is None:
167+
is_server_side_error = isinstance(exc, OperationFailure)
168+
self.failure = failure
169+
if self._enabled():
170+
_debug_log(
171+
_COMMAND_LOGGER,
172+
message=_CommandStatusMessage.FAILED,
173+
clientId=self._client._topology_settings._topology_id,
174+
durationMS=self.duration,
175+
failure=failure,
176+
commandName=next(iter(self._spec)),
177+
databaseName=self._dbname,
178+
requestId=self._request_id,
179+
operationId=self._operation_id,
180+
driverConnectionId=self._conn.id,
181+
serverConnectionId=self._conn.server_connection_id,
182+
serverHost=self._conn.address[0],
183+
serverPort=self._conn.address[1],
184+
serviceId=self._conn.service_id,
185+
isServerSideError=is_server_side_error,
186+
)
187+
self._handled = True
188+
return self.duration
189+
190+
def __exit__(
191+
self,
192+
exc_type: Optional[type],
193+
exc_val: Optional[BaseException],
194+
exc_tb: Any,
195+
) -> None:
196+
# Safety net: log a failure if an exception propagates without the
197+
# outcome having been recorded explicitly by the caller.
198+
if exc_val is not None and not self._handled:
199+
self.handle_failed(exc_val)

pymongo/asynchronous/aggregation.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from typing import TYPE_CHECKING, Any, Optional, Union
2020

2121
from pymongo import common
22+
from pymongo.asynchronous import network
2223
from pymongo.collation import validate_collation_or_none
2324
from pymongo.errors import ConfigurationError
2425
from pymongo.read_preferences import ReadPreference, _AggWritePref
@@ -159,7 +160,7 @@ async def get_cursor(
159160
write_concern = None
160161

161162
# Run command.
162-
result = await conn.command(
163+
result = await network.command(conn,
163164
self._database.name,
164165
cmd,
165166
read_preference,

pymongo/asynchronous/auth.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
from urllib.parse import quote
3434

3535
from bson.binary import Binary
36+
from pymongo.asynchronous import network
3637
from pymongo.asynchronous.auth_aws import _authenticate_aws
3738
from pymongo.asynchronous.auth_oidc import (
3839
_authenticate_oidc,
@@ -96,7 +97,7 @@ async def _authenticate_scram(
9697
res = ctx.speculative_authenticate
9798
else:
9899
nonce, first_bare, cmd = _authenticate_scram_start(credentials, mechanism)
99-
res = await conn.command(source, cmd)
100+
res = await network.command(conn, source, cmd)
100101

101102
assert res is not None
102103
server_first = res["payload"]
@@ -135,7 +136,7 @@ async def _authenticate_scram(
135136
"conversationId": res["conversationId"],
136137
"payload": Binary(client_final),
137138
}
138-
res = await conn.command(source, cmd)
139+
res = await network.command(conn, source, cmd)
139140

140141
parsed = _parse_scram_response(res["payload"])
141142
if not hmac.compare_digest(parsed[b"v"], server_sig):
@@ -149,7 +150,7 @@ async def _authenticate_scram(
149150
"conversationId": res["conversationId"],
150151
"payload": Binary(b""),
151152
}
152-
res = await conn.command(source, cmd)
153+
res = await network.command(conn, source, cmd)
153154
if not res["done"]:
154155
raise OperationFailure("SASL conversation failed to complete.")
155156

@@ -272,7 +273,7 @@ async def _authenticate_gssapi(credentials: MongoCredential, conn: AsyncConnecti
272273
"payload": payload,
273274
"autoAuthorize": 1,
274275
}
275-
response = await conn.command("$external", cmd)
276+
response = await network.command(conn, "$external", cmd)
276277

277278
# Limit how many times we loop to catch protocol / library issues
278279
for _ in range(10):
@@ -287,7 +288,7 @@ async def _authenticate_gssapi(credentials: MongoCredential, conn: AsyncConnecti
287288
"conversationId": response["conversationId"],
288289
"payload": payload,
289290
}
290-
response = await conn.command("$external", cmd)
291+
response = await network.command(conn, "$external", cmd)
291292

292293
if result == kerberos.AUTH_GSS_COMPLETE:
293294
break
@@ -308,7 +309,7 @@ async def _authenticate_gssapi(credentials: MongoCredential, conn: AsyncConnecti
308309
"conversationId": response["conversationId"],
309310
"payload": payload,
310311
}
311-
await conn.command("$external", cmd)
312+
await network.command(conn, "$external", cmd)
312313

313314
finally:
314315
kerberos.authGSSClientClean(ctx)
@@ -329,7 +330,7 @@ async def _authenticate_plain(credentials: MongoCredential, conn: AsyncConnectio
329330
"payload": Binary(payload),
330331
"autoAuthorize": 1,
331332
}
332-
await conn.command(source, cmd)
333+
await network.command(conn, source, cmd)
333334

334335

335336
async def _authenticate_x509(credentials: MongoCredential, conn: AsyncConnection) -> None:
@@ -340,7 +341,7 @@ async def _authenticate_x509(credentials: MongoCredential, conn: AsyncConnection
340341
return
341342

342343
cmd = _X509Context(credentials, conn.address).speculate_command()
343-
await conn.command("$external", cmd)
344+
await network.command(conn, "$external", cmd)
344345

345346

346347
async def _authenticate_default(credentials: MongoCredential, conn: AsyncConnection) -> None:
@@ -351,7 +352,7 @@ async def _authenticate_default(credentials: MongoCredential, conn: AsyncConnect
351352
source = credentials.source
352353
cmd = conn.hello_cmd()
353354
cmd["saslSupportedMechs"] = source + "." + credentials.username
354-
mechs = (await conn.command(source, cmd, publish_events=False)).get(
355+
mechs = (await network.command(conn, source, cmd, publish_events=False)).get(
355356
"saslSupportedMechs", []
356357
)
357358
if "SCRAM-SHA-256" in mechs:

pymongo/asynchronous/auth_aws.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import bson
2121
from bson.binary import Binary
22+
from pymongo.asynchronous import network
2223
from pymongo.errors import ConfigurationError, OperationFailure
2324

2425
if TYPE_CHECKING:
@@ -73,7 +74,7 @@ def bson_decode(self, data: _ReadableBuffer) -> Mapping[str, Any]:
7374
)
7475
client_payload = ctx.step(None)
7576
client_first = {"saslStart": 1, "mechanism": "MONGODB-AWS", "payload": client_payload}
76-
server_first = await conn.command("$external", client_first)
77+
server_first = await network.command(conn, "$external", client_first)
7778
res = server_first
7879
# Limit how many times we loop to catch protocol / library issues
7980
for _ in range(10):
@@ -83,7 +84,7 @@ def bson_decode(self, data: _ReadableBuffer) -> Mapping[str, Any]:
8384
"conversationId": server_first["conversationId"],
8485
"payload": client_payload,
8586
}
86-
res = await conn.command("$external", cmd)
87+
res = await network.command(conn, "$external", cmd)
8788
if res["done"]:
8889
# SASL complete.
8990
break

pymongo/asynchronous/auth_oidc.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import bson
2525
from bson.binary import Binary
2626
from pymongo._csot import remaining
27+
from pymongo.asynchronous import network
2728
from pymongo.auth_oidc_shared import (
2829
CALLBACK_VERSION,
2930
HUMAN_CALLBACK_TIMEOUT_SECONDS,
@@ -235,7 +236,7 @@ async def _run_command(
235236
self, conn: AsyncConnection, cmd: MutableMapping[str, Any]
236237
) -> Mapping[str, Any]:
237238
try:
238-
return await conn.command("$external", cmd, no_reauth=True) # type: ignore[call-arg]
239+
return await network.command(conn, "$external", cmd, no_reauth=True) # type: ignore[call-arg]
239240
except OperationFailure as e:
240241
if self._is_auth_error(e):
241242
self._invalidate(conn)

0 commit comments

Comments
 (0)