Skip to content

Server-side streaming cursors #36

@vgvoleg

Description

@vgvoleg

Motivation

The current Cursor / AsyncCursor implementation buffers the full
result of a query in memory before fetchone / fetchmany / fetchall
can be called. For large result sets (analytical queries, full-table
scans, ETL-style jobs) this is either infeasible or prohibitively
memory-hungry.

The YDB Query API exposes result sets as a stream
(SyncResponseContextIterator / AsyncResponseContextIterator) —
result sets arrive incrementally over the wire. A DB-API cursor that
consumes that stream lazily would let users process arbitrarily large
results with bounded memory.

Downstream use case: SQLAlchemy

SQLAlchemy has first-class support for server-side cursors via:

  • Connection.execution_options(stream_results=True)
  • Query.yield_per(N) / select(...).execution_options(yield_per=N)

For these to work, the DB-API driver must expose a cursor that fetches
from the server incrementally rather than materialising everything up
front. Without a streaming cursor on our side, SQLAlchemy users can't
use yield_per / stream_results against YDB and have to either page
manually or blow up memory.

Equivalents in other drivers:

  • psycopg2 — named (server-side) cursors: conn.cursor(name="...")
  • psycopg (v3) — conn.cursor(name="...") / ClientCursor vs
    ServerCursor
  • asyncpg — cursor objects returned from conn.cursor(query) inside
    a transaction

Proposed API

Expose a streaming variant through an extra kwarg on Connection.cursor:

with connection.cursor(stream_results=True) as cur:
    cur.execute("SELECT ... FROM huge_table")
    for row in iter(cur.fetchone, None):
        ...

And for async:

async with async_connection.cursor(stream_results=True) as cur:
    await cur.execute("SELECT ... FROM huge_table")
    while (row := await cur.fetchone()) is not None:
        ...

New public classes: StreamCursor, AsyncStreamCursor, exported from
ydb_dbapi.

Scope

  • StreamCursor (sync) consuming SyncResponseContextIterator
  • AsyncStreamCursor consuming AsyncResponseContextIterator
  • Own-session mode (auto-commit, no interactive tx): cursor acquires
    a session from the pool for the duration of the stream and
    releases on finish / close / error
  • Interactive-tx mode: stream runs on the connection's tx session,
    with exclusivity — while a stream is active no other cursor on
    the same connection may execute (would corrupt the tx session);
    commit / rollback should reject while a stream is running
  • rowcount semantics for streaming (likely -1 until drained)
  • close() must cleanly terminate a mid-flight stream (cancel +
    discard session, or drain) in both sync and async paths
  • Integration tests against a local YDB (see
    .github/docker/docker-compose.yml)
  • Unit tests with fake session/stream pools
  • README documentation for the new flag + both code examples
  • SQLAlchemy dialect wiring for stream_results /
    yield_per — likely a follow-up in ydb-sqlalchemy, but worth
    mentioning here

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions