From ccbd50c65efcb7426ac08cd0fe4146faf821e55c Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Thu, 23 Apr 2026 12:48:20 +0300 Subject: [PATCH] Invalidate session on cancelled / transport stream errors --- tests/aio/query/test_query_transaction.py | 112 ++++++++++++++++++++++ tests/conftest.py | 45 ++++++++- ydb/aio/query/base.py | 26 ++++- ydb/query/base.py | 23 ++++- ydb/query/session.py | 29 +++++- 5 files changed, 223 insertions(+), 12 deletions(-) diff --git a/tests/aio/query/test_query_transaction.py b/tests/aio/query/test_query_transaction.py index cc4ff902..2fdca877 100644 --- a/tests/aio/query/test_query_transaction.py +++ b/tests/aio/query/test_query_transaction.py @@ -158,6 +158,118 @@ async def slow_rollback_call(settings=None): async with pool.checkout() as session: assert session.session_id != session_id_before + @pytest.mark.asyncio + async def test_successful_stream_keeps_session_active(self, tx: QueryTxContext): + # Guard for issue #812 fix: the widened _next handler (BaseException + # instead of Exception) must still let StopAsyncIteration terminate + # the stream without feeding it to _on_execute_stream_error — which + # would then invalidate the session on every normal SELECT. + await tx.begin() + + async with await tx.execute("SELECT 1") as results: + async for _ in results: + pass + + assert tx.session.is_active + + @pytest.mark.asyncio + async def test_query_level_error_keeps_session_active(self, pool): + # Guard for issue #812 fix: _on_execute_stream_error now invalidates + # the session on a broader set of errors (CancelledError, SessionBusy, + # BadSession, ConnectionError, Cancelled, DeadlineExceed). Query-level + # errors like syntax errors must NOT invalidate — the session is fine, + # only the statement is bad. + async with pool.checkout() as session: + session_id_before = session.session_id + with pytest.raises(ydb.issues.Error): + async with await session.execute("this is not valid SQL") as results: + async for _ in results: + pass + assert session.is_active + # And the session must remain reusable for the next statement. + async with await session.execute("SELECT 1") as results: + async for _ in results: + pass + assert session.session_id == session_id_before + assert session.is_active + + @pytest.mark.asyncio + async def test_cancelled_error_during_stream_invalidates_session(self, tx: QueryTxContext): + # Regression for issue #812: once a gRPC aio call is cancelled, every + # subsequent read on it raises asyncio.CancelledError. Because + # CancelledError inherits from BaseException, the old _next handler + # (which caught Exception) never invoked _on_error, and the session + # was returned to the pool in a bad state. Verify the hook now runs + # on CancelledError and invalidates the session. + await tx.begin() + + from ydb.aio._utilities import AsyncResponseIterator + + results = await tx.execute("SELECT 1") + + async def raising_next(self): + raise asyncio.CancelledError("simulated grpc.aio call cancellation") + + with mock.patch.object(AsyncResponseIterator, "_next", raising_next): + with pytest.raises(asyncio.CancelledError): + async for _ in results: + pass + + assert not tx.session.is_active + + @pytest.mark.asyncio + async def test_session_busy_during_stream_invalidates_session(self, tx: QueryTxContext): + # Regression for issue #812, case 2: SessionBusy from the stream means + # the server still considers the previous query in flight; continuing + # to use this session poisons the next caller. Must invalidate. + await tx.begin() + + from ydb.aio._utilities import AsyncResponseIterator + + results = await tx.execute("SELECT 1") + + async def raising_next(self): + raise ydb.issues.SessionBusy("simulated SessionBusy from stream") + + with mock.patch.object(AsyncResponseIterator, "_next", raising_next): + with pytest.raises(ydb.issues.SessionBusy): + async for _ in results: + pass + + assert not tx.session.is_active + + @pytest.mark.asyncio + async def test_cancelled_stream_in_pool_does_not_poison_next_caller(self, pool): + # Regression for issue #812: a single mis-timed cancel during + # tx.execute(...) iteration should not leave the session in a state + # where the next unrelated caller sees SessionBusy or otherwise reuses + # a session the server considers busy. + from ydb.aio._utilities import AsyncResponseIterator + + victim_sid = None + + async def victim(tx): + nonlocal victim_sid + victim_sid = tx.session.session_id + results = await tx.execute("SELECT 1") + + orig_next = AsyncResponseIterator._next + + async def raising_next(self): + raise asyncio.CancelledError("simulated grpc.aio call cancellation") + + with mock.patch.object(AsyncResponseIterator, "_next", raising_next): + async for _ in results: + pass + # Restore so later operations in this session (if any) would work. + AsyncResponseIterator._next = orig_next + + with pytest.raises(asyncio.CancelledError): + await pool.retry_tx_async(victim) + + async with pool.checkout() as session: + assert session.session_id != victim_sid + @pytest.mark.asyncio async def test_rollback_after_tli_aborted_is_safe(self, pool, table_path: str): # Given: a row in a table diff --git a/tests/conftest.py b/tests/conftest.py index 9f39151d..7112f987 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -44,6 +44,31 @@ def pytest_addoption(parser): default="docker-compose.yml", help="Path to docker-compose file (relative to project root)", ) + parser.addoption( + "--ydb-endpoint", + action="store", + default=None, + help=( + "Use an already-running YDB endpoint (e.g. localhost:2136) instead of spinning " + "a container via docker-compose. Also honored from the YDB_ENDPOINT env var. " + "Tests that explicitly restart the container via the `docker_project` fixture " + "(chaos-style) are incompatible with this mode." + ), + ) + + +def _running_ydb_endpoint(pytestconfig): + """Return a pre-running endpoint if the user asked for one, else None.""" + existing = pytestconfig.getoption("--ydb-endpoint") or os.environ.get("YDB_ENDPOINT") + if not existing: + return None + # Strip scheme if present — the `endpoint` fixture is expected to return + # the "host:port" form that is_ydb_responsive / driver construction use. + for prefix in ("grpcs://", "grpc://"): + if existing.startswith(prefix): + existing = existing[len(prefix) :] + break + return existing @pytest.fixture(scope="session") @@ -156,8 +181,24 @@ def is_ydb_secure_responsive(endpoint, root_certificates): @pytest.fixture(scope="module") -def endpoint(docker_ip, docker_services): - """Wait for YDB to be responsive and return endpoint.""" +def endpoint(pytestconfig, request): + """Wait for YDB to be responsive and return endpoint. + + If --ydb-endpoint / YDB_ENDPOINT is set, return it directly without + touching pytest-docker — this lets tests run against an already-running + container. + """ + existing = _running_ydb_endpoint(pytestconfig) + if existing is not None: + if not is_ydb_responsive(existing): + raise RuntimeError(f"--ydb-endpoint={existing} is not responsive") + yield existing + return + + # Pytest-docker path: resolve docker_services lazily so the fixtures are + # only requested when we actually need to spin a container. + docker_ip = request.getfixturevalue("docker_ip") + docker_services = request.getfixturevalue("docker_services") port = docker_services.port_for("ydb", 2136) endpoint_url = f"{docker_ip}:{port}" docker_services.wait_until_responsive( diff --git a/ydb/aio/query/base.py b/ydb/aio/query/base.py index 66df3703..a1d6e5d7 100644 --- a/ydb/aio/query/base.py +++ b/ydb/aio/query/base.py @@ -12,12 +12,30 @@ async def __aenter__(self) -> "AsyncResponseContextIterator": async def _next(self): try: return await super()._next() - except Exception as e: + except StopAsyncIteration: + # Normal stream termination is not an error and must not invalidate + # the session. + raise + except BaseException as e: + # BaseException (not Exception) because asyncio.CancelledError + # inherits from BaseException in Python 3.8+. A stream interrupted + # by a cancel must also be reported to _on_error so the session can + # be invalidated; otherwise the next caller that picks this session + # out of the pool races the undrained stream and the server can + # reply with SessionBusy. if self._on_error: self._on_error(e) - raise e + raise async def __aexit__(self, exc_type, exc_val, exc_tb): - # To close stream on YDB it is necessary to scroll through it to the end - async for _ in self: + # To close stream on YDB it is necessary to scroll through it to the end. + # Errors that happen during the cleanup drain have already been reported + # to _on_error inside _next, so swallow them here — re-raising from + # __aexit__ would mask whatever exception is already propagating out of + # the `async with` body and would leave callers (e.g. the tx __aexit__) + # unable to run their own cleanup (rollback). + try: + async for _ in self: + pass + except BaseException: pass diff --git a/ydb/query/base.py b/ydb/query/base.py index e7764e1c..bf0d80b9 100644 --- a/ydb/query/base.py +++ b/ydb/query/base.py @@ -83,14 +83,29 @@ def __enter__(self) -> "SyncResponseContextIterator": def _next(self): try: return super()._next() - except Exception as e: + except StopIteration: + # Normal stream termination is not an error and must not invalidate + # the session. + raise + except BaseException as e: + # BaseException (not Exception) for parity with the async iterator: + # KeyboardInterrupt / SystemExit should still invalidate the session + # before they propagate, otherwise the next caller that reuses the + # session races the undrained stream and the server can reply with + # SessionBusy. if self._on_error: self._on_error(e) - raise e + raise def __exit__(self, exc_type, exc_val, exc_tb): - # To close stream on YDB it is necessary to scroll through it to the end - for _ in self: + # To close stream on YDB it is necessary to scroll through it to the end. + # Errors during the cleanup drain have already been reported to _on_error + # inside _next; swallow them here so __exit__ does not mask a primary + # exception and the caller's own cleanup (e.g. tx rollback) can still run. + try: + for _ in self: + pass + except BaseException: pass diff --git a/ydb/query/session.py b/ydb/query/session.py index af4b7ec6..b28cba8b 100644 --- a/ydb/query/session.py +++ b/ydb/query/session.py @@ -136,8 +136,33 @@ def _invalidate(self) -> None: except Exception: pass - def _on_execute_stream_error(self, e: Exception) -> None: - if isinstance(e, issues.DeadlineExceed): + def _on_execute_stream_error(self, e: BaseException) -> None: + # The execute stream is a single gRPC call that carries all of a + # query's response parts. If any of these errors surface while reading + # it, the server-side stream is either known-dead (BadSession, + # ConnectionError, DeadlineExceed) or undrained and un-resumable + # (SessionBusy: server thinks this session still has the previous + # query running; Cancelled: the call has been torn down mid-flight), + # which means a subsequent query on the same session can race the + # stragglers and get a spurious SessionBusy back. Drop the session so + # the pool creates a fresh one on the next acquire. + # + # Accepts BaseException so that asyncio.CancelledError (not an + # issues.Error subclass) — the case documented in the bug report — + # also invalidates here. + if isinstance(e, issues.Error): + if isinstance( + e, + ( + issues.DeadlineExceed, + issues.SessionBusy, + issues.BadSession, + issues.ConnectionError, + issues.Cancelled, + ), + ): + self._invalidate() + else: self._invalidate() # Overloads for _create_call