Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions tests/aio/query/test_query_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,118 @@ async def slow_rollback_call(settings=None):
async with pool.checkout() as session:
assert session.session_id != session_id_before

@pytest.mark.asyncio
async def test_successful_stream_keeps_session_active(self, tx: QueryTxContext):
# Guard for issue #812 fix: the widened _next handler (BaseException
# instead of Exception) must still let StopAsyncIteration terminate
# the stream without feeding it to _on_execute_stream_error — which
# would then invalidate the session on every normal SELECT.
await tx.begin()

async with await tx.execute("SELECT 1") as results:
async for _ in results:
pass

assert tx.session.is_active

@pytest.mark.asyncio
async def test_query_level_error_keeps_session_active(self, pool):
# Guard for issue #812 fix: _on_execute_stream_error now invalidates
# the session on a broader set of errors (CancelledError, SessionBusy,
# BadSession, ConnectionError, Cancelled, DeadlineExceed). Query-level
# errors like syntax errors must NOT invalidate — the session is fine,
# only the statement is bad.
async with pool.checkout() as session:
session_id_before = session.session_id
with pytest.raises(ydb.issues.Error):
async with await session.execute("this is not valid SQL") as results:
async for _ in results:
pass
assert session.is_active
# And the session must remain reusable for the next statement.
async with await session.execute("SELECT 1") as results:
async for _ in results:
pass
assert session.session_id == session_id_before
assert session.is_active

@pytest.mark.asyncio
async def test_cancelled_error_during_stream_invalidates_session(self, tx: QueryTxContext):
# Regression for issue #812: once a gRPC aio call is cancelled, every
# subsequent read on it raises asyncio.CancelledError. Because
# CancelledError inherits from BaseException, the old _next handler
# (which caught Exception) never invoked _on_error, and the session
# was returned to the pool in a bad state. Verify the hook now runs
# on CancelledError and invalidates the session.
await tx.begin()

from ydb.aio._utilities import AsyncResponseIterator

results = await tx.execute("SELECT 1")

async def raising_next(self):
raise asyncio.CancelledError("simulated grpc.aio call cancellation")

with mock.patch.object(AsyncResponseIterator, "_next", raising_next):
with pytest.raises(asyncio.CancelledError):
async for _ in results:
pass

assert not tx.session.is_active

@pytest.mark.asyncio
async def test_session_busy_during_stream_invalidates_session(self, tx: QueryTxContext):
# Regression for issue #812, case 2: SessionBusy from the stream means
# the server still considers the previous query in flight; continuing
# to use this session poisons the next caller. Must invalidate.
await tx.begin()

from ydb.aio._utilities import AsyncResponseIterator

results = await tx.execute("SELECT 1")

async def raising_next(self):
raise ydb.issues.SessionBusy("simulated SessionBusy from stream")

with mock.patch.object(AsyncResponseIterator, "_next", raising_next):
with pytest.raises(ydb.issues.SessionBusy):
async for _ in results:
pass

assert not tx.session.is_active

@pytest.mark.asyncio
async def test_cancelled_stream_in_pool_does_not_poison_next_caller(self, pool):
# Regression for issue #812: a single mis-timed cancel during
# tx.execute(...) iteration should not leave the session in a state
# where the next unrelated caller sees SessionBusy or otherwise reuses
# a session the server considers busy.
from ydb.aio._utilities import AsyncResponseIterator

victim_sid = None

async def victim(tx):
nonlocal victim_sid
victim_sid = tx.session.session_id
results = await tx.execute("SELECT 1")

orig_next = AsyncResponseIterator._next

async def raising_next(self):
raise asyncio.CancelledError("simulated grpc.aio call cancellation")

with mock.patch.object(AsyncResponseIterator, "_next", raising_next):
async for _ in results:
pass
# Restore so later operations in this session (if any) would work.
AsyncResponseIterator._next = orig_next

with pytest.raises(asyncio.CancelledError):
await pool.retry_tx_async(victim)

async with pool.checkout() as session:
assert session.session_id != victim_sid

@pytest.mark.asyncio
async def test_rollback_after_tli_aborted_is_safe(self, pool, table_path: str):
# Given: a row in a table
Expand Down
45 changes: 43 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,31 @@ def pytest_addoption(parser):
default="docker-compose.yml",
help="Path to docker-compose file (relative to project root)",
)
parser.addoption(
"--ydb-endpoint",
action="store",
default=None,
help=(
"Use an already-running YDB endpoint (e.g. localhost:2136) instead of spinning "
"a container via docker-compose. Also honored from the YDB_ENDPOINT env var. "
"Tests that explicitly restart the container via the `docker_project` fixture "
"(chaos-style) are incompatible with this mode."
),
)


def _running_ydb_endpoint(pytestconfig):
"""Return a pre-running endpoint if the user asked for one, else None."""
existing = pytestconfig.getoption("--ydb-endpoint") or os.environ.get("YDB_ENDPOINT")
if not existing:
return None
# Strip scheme if present — the `endpoint` fixture is expected to return
# the "host:port" form that is_ydb_responsive / driver construction use.
for prefix in ("grpcs://", "grpc://"):
if existing.startswith(prefix):
existing = existing[len(prefix) :]
break
return existing


@pytest.fixture(scope="session")
Expand Down Expand Up @@ -156,8 +181,24 @@ def is_ydb_secure_responsive(endpoint, root_certificates):


@pytest.fixture(scope="module")
def endpoint(docker_ip, docker_services):
"""Wait for YDB to be responsive and return endpoint."""
def endpoint(pytestconfig, request):
"""Wait for YDB to be responsive and return endpoint.

If --ydb-endpoint / YDB_ENDPOINT is set, return it directly without
touching pytest-docker — this lets tests run against an already-running
container.
"""
existing = _running_ydb_endpoint(pytestconfig)
if existing is not None:
if not is_ydb_responsive(existing):
raise RuntimeError(f"--ydb-endpoint={existing} is not responsive")
yield existing
return

# Pytest-docker path: resolve docker_services lazily so the fixtures are
# only requested when we actually need to spin a container.
docker_ip = request.getfixturevalue("docker_ip")
docker_services = request.getfixturevalue("docker_services")
port = docker_services.port_for("ydb", 2136)
endpoint_url = f"{docker_ip}:{port}"
docker_services.wait_until_responsive(
Expand Down
26 changes: 22 additions & 4 deletions ydb/aio/query/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,30 @@ async def __aenter__(self) -> "AsyncResponseContextIterator":
async def _next(self):
try:
return await super()._next()
except Exception as e:
except StopAsyncIteration:
# Normal stream termination is not an error and must not invalidate
# the session.
raise
except BaseException as e:
# BaseException (not Exception) because asyncio.CancelledError
# inherits from BaseException in Python 3.8+. A stream interrupted
# by a cancel must also be reported to _on_error so the session can
# be invalidated; otherwise the next caller that picks this session
# out of the pool races the undrained stream and the server can
# reply with SessionBusy.
if self._on_error:
self._on_error(e)
raise e
raise

async def __aexit__(self, exc_type, exc_val, exc_tb):
# To close stream on YDB it is necessary to scroll through it to the end
async for _ in self:
# To close stream on YDB it is necessary to scroll through it to the end.
# Errors that happen during the cleanup drain have already been reported
# to _on_error inside _next, so swallow them here — re-raising from
# __aexit__ would mask whatever exception is already propagating out of
# the `async with` body and would leave callers (e.g. the tx __aexit__)
# unable to run their own cleanup (rollback).
try:
async for _ in self:
pass
except BaseException:
pass
23 changes: 19 additions & 4 deletions ydb/query/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,29 @@ def __enter__(self) -> "SyncResponseContextIterator":
def _next(self):
try:
return super()._next()
except Exception as e:
except StopIteration:
# Normal stream termination is not an error and must not invalidate
# the session.
raise
except BaseException as e:
# BaseException (not Exception) for parity with the async iterator:
# KeyboardInterrupt / SystemExit should still invalidate the session
# before they propagate, otherwise the next caller that reuses the
# session races the undrained stream and the server can reply with
# SessionBusy.
if self._on_error:
self._on_error(e)
raise e
raise

def __exit__(self, exc_type, exc_val, exc_tb):
# To close stream on YDB it is necessary to scroll through it to the end
for _ in self:
# To close stream on YDB it is necessary to scroll through it to the end.
# Errors during the cleanup drain have already been reported to _on_error
# inside _next; swallow them here so __exit__ does not mask a primary
# exception and the caller's own cleanup (e.g. tx rollback) can still run.
try:
for _ in self:
pass
except BaseException:
pass


Expand Down
29 changes: 27 additions & 2 deletions ydb/query/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,33 @@ def _invalidate(self) -> None:
except Exception:
pass

def _on_execute_stream_error(self, e: Exception) -> None:
if isinstance(e, issues.DeadlineExceed):
def _on_execute_stream_error(self, e: BaseException) -> None:
# The execute stream is a single gRPC call that carries all of a
# query's response parts. If any of these errors surface while reading
# it, the server-side stream is either known-dead (BadSession,
# ConnectionError, DeadlineExceed) or undrained and un-resumable
# (SessionBusy: server thinks this session still has the previous
# query running; Cancelled: the call has been torn down mid-flight),
# which means a subsequent query on the same session can race the
# stragglers and get a spurious SessionBusy back. Drop the session so
# the pool creates a fresh one on the next acquire.
#
# Accepts BaseException so that asyncio.CancelledError (not an
# issues.Error subclass) — the case documented in the bug report —
# also invalidates here.
if isinstance(e, issues.Error):
if isinstance(
e,
(
issues.DeadlineExceed,
issues.SessionBusy,
issues.BadSession,
issues.ConnectionError,
issues.Cancelled,
),
):
self._invalidate()
else:
self._invalidate()

# Overloads for _create_call
Expand Down
Loading