Skip to content

Stream Cursors#35

Draft
vgvoleg wants to merge 1 commit intomainfrom
stream_cursor
Draft

Stream Cursors#35
vgvoleg wants to merge 1 commit intomainfrom
stream_cursor

Conversation

@vgvoleg
Copy link
Copy Markdown
Member

@vgvoleg vgvoleg commented Apr 24, 2026

Closes: #36

@vgvoleg vgvoleg requested a review from Copilot April 24, 2026 12:48
@vgvoleg vgvoleg marked this pull request as draft April 24, 2026 12:50
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Adds support for server-side (streaming) cursors to incrementally consume large result sets without buffering everything in memory.

Changes:

  • Introduces StreamCursor / AsyncStreamCursor and adds stream_results=True option to Connection.cursor() / AsyncConnection.cursor().
  • Tracks active streaming cursor per connection to block conflicting operations during interactive transactions.
  • Expands unit + integration test coverage and updates README with streaming usage examples.

Reviewed changes

Copilot reviewed 8 out of 9 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
ydb_dbapi/utils.py Prevents double-wrapping of already-normalized DBAPI errors in handle_ydb_errors.
ydb_dbapi/cursors.py Refactors shared cursor state into BaseCursor and adds sync/async streaming cursors.
ydb_dbapi/connections.py Adds stream_results flag, tracks active stream cursor, and blocks commit/rollback/queries while streaming.
ydb_dbapi/init.py Exports stream cursor classes publicly.
tests/test_cursors_unit.py Adds unit tests for stream cursor buffering/rowcount semantics with fake iterators/pools.
tests/test_cursors.py Adds integration tests validating streaming behavior and transaction-session blocking.
tests/test_connections.py Minor formatting-only changes.
README.md Documents how to use streaming cursors for sync and async connections.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread ydb_dbapi/cursors.py
Comment on lines +450 to +466
def _finish_stream(self) -> None:
self._stream = None
self._release_owned_session()
self._clear_current_cursor()
self._finish_query()

@handle_ydb_errors
@invalidate_cursor_on_ydb_error
def _load_next_result_set(self) -> bool:
if self._stream is None:
return False

try:
result_set = next(self._stream)
except StopIteration:
self._finish_stream()
return False
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On normal stream exhaustion (StopIteration), the code releases the session but never explicitly closes/exits the SyncResponseContextIterator context. If the YDB iterator relies on __exit__ for cleanup (network resources, server-side cancellation, etc.), this may leak resources. Consider ensuring the iterator is always exited/closed (e.g., wrap iteration in a context manager and/or call the iterator’s close/exit equivalent inside _finish_stream, including the cancel() path).

Copilot uses AI. Check for mistakes.
Comment thread ydb_dbapi/cursors.py
Comment on lines +865 to +881
async def _finish_stream(self) -> None:
self._stream = None
await self._release_owned_session()
self._clear_current_cursor()
self._finish_query()

@handle_ydb_errors
@invalidate_cursor_on_ydb_error
async def _load_next_result_set(self) -> bool:
if self._stream is None:
return False

try:
result_set = await self._stream.__anext__()
except StopAsyncIteration:
await self._finish_stream()
return False
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the sync stream cursor: after StopAsyncIteration, the iterator is dropped and the session is released, but the AsyncResponseContextIterator is not explicitly exited/closed. If the iterator’s __aexit__ performs required cleanup, this can leave resources open. Consider consistently async with-wrapping the stream lifetime (or invoking its close/exit API) so both normal completion and error/close paths clean up deterministically.

Copilot uses AI. Check for mistakes.
Comment thread README.md
Comment on lines +40 to +41
For large result sets you can use a server-side cursor that streams result
sets incrementally instead of buffering the whole response in memory:
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs describe how to enable stream_results, but they don’t mention the key behavioral constraint enforced by the code: during interactive transactions, an active streaming cursor can block other queries and commit/rollback until the stream is fully consumed or explicitly closed (raising ProgrammingError otherwise). Consider adding a short note here (and in the async section) explaining that callers must consume all rows or call cursor.close() before issuing other operations on the same connection/transaction.

Copilot uses AI. Check for mistakes.
Comment on lines +16 to +19
def maybe_await(obj: callable) -> any:
if not iscoroutine(obj):
return obj
return await_only(obj)
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These annotations are likely unintended: callable is a built-in, not a typing annotation, and any should be typing.Any. Since this helper is passed arbitrary objects (often None), annotate it as taking object (or Any) and returning Any, and import the appropriate typing symbols.

Copilot uses AI. Check for mistakes.
def _set_current_cursor(self, cursor: AsyncStreamCursor) -> None: ...


class BaseStreamCursorTestSuit:
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class name uses TestSuit (missing 'e'); the standard spelling is TestSuite. Renaming improves readability/searchability across the test code.

Copilot uses AI. Check for mistakes.
Comment thread ydb_dbapi/connections.py
**kwargs,
)
self._current_cursor: Cursor | None = None
self._current_cursor: StreamCursor | None = None
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: This parameter may hold both: Cursor and StreamCursor, and the second is the child class of the first. Therefore, we should put here either Cursor | StreamCursor | None as type or Cursor | None to avoid calls of unimplemented methods.

Comment thread ydb_dbapi/connections.py
if self._session:
self._session_pool.release(self._session)
self._session = None
self._clear_current_cursor()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: If the commit fails, for example due to TLI, we skip the clearing of the cursor. I suggest to wrap it to the try-finally block. Same for the session release by the way.

Comment thread ydb_dbapi/utils.py
async def awrapper(*args: tuple, **kwargs: dict) -> Any:
try:
return await func(*args, **kwargs)
except Error:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Are error classes the part of the dbapi interface? Is it ok to raise bare Error from the SDK or it must be DatabaseError always or something? If yes, we need to wrap bare "Error" to "DatabaseError" if occur.

Comment thread ydb_dbapi/cursors.py
self._raise_if_closed()
self._raise_if_running()
if self._tx_context is not None:
self._connection._raise_if_current_cursor_running()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Why do we need this? And why do we call the private method of the connection?

Comment thread ydb_dbapi/cursors.py
def _invalidate_active_session(self) -> None:
self._clear_current_cursor()
if self._session_owner is None:
self._connection._invalidate_session()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nipick: Let's maybe make this method public?

Comment thread ydb_dbapi/cursors.py
settings = self._get_request_settings()

def callee() -> SyncResponseContextIterator:
acquire_timeout = getattr(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: Why not just self._retry_settings.max_session_acquire_timeout?

Comment thread ydb_dbapi/cursors.py
)
session = self._session_pool.acquire(timeout=acquire_timeout)
try:
stream = session.transaction(self._tx_mode).execute(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Why do you create a transaction here? Why not just "execute"? Although, the transacton method returns a context manager, so you need to use "with" block.

Copy link
Copy Markdown
Collaborator

@LuckySting LuckySting left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: I see formatting changes in the PR, but we didn’t update the linter config. Does this mean our CI linters currently allow multiple formatting styles? If so, let’s tighten the CI settings so only a single formatting standard passes the checks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Server-side streaming cursors

3 participants