Skip to content

Server-side streaming cursors #36

@vgvoleg

Description

@vgvoleg

Motivation

The current Cursor / AsyncCursor implementation buffers the full
result of a query in memory before fetchone / fetchmany / fetchall
can be called. For large result sets (analytical queries, full-table
scans, ETL-style jobs) this is either infeasible or prohibitively
memory-hungry.

The YDB Query API exposes result sets as a stream
(SyncResponseContextIterator / AsyncResponseContextIterator) —
result sets arrive incrementally over the wire. A DB-API cursor that
consumes that stream lazily would let users process arbitrarily large
results with bounded memory.

Downstream use case: SQLAlchemy

SQLAlchemy has first-class support for server-side cursors via:

  • Connection.execution_options(stream_results=True)
  • Query.yield_per(N) / select(...).execution_options(yield_per=N)

For these to work, the DB-API driver must expose a cursor that fetches
from the server incrementally rather than materialising everything up
front. Without a streaming cursor on our side, SQLAlchemy users can't
use yield_per / stream_results against YDB and have to either page
manually or blow up memory.

Equivalents in other drivers:

  • psycopg2 — named (server-side) cursors: conn.cursor(name="...")
  • psycopg (v3) — conn.cursor(name="...") / ClientCursor vs
    ServerCursor
  • asyncpg — cursor objects returned from conn.cursor(query) inside
    a transaction

Proposed API

Expose a streaming variant through an extra kwarg on Connection.cursor:

with connection.cursor(stream_results=True) as cur:
    cur.execute("SELECT ... FROM huge_table")
    for row in iter(cur.fetchone, None):
        ...

And for async:

async with async_connection.cursor(stream_results=True) as cur:
    await cur.execute("SELECT ... FROM huge_table")
    while (row := await cur.fetchone()) is not None:
        ...

New public classes: StreamCursor, AsyncStreamCursor, exported from
ydb_dbapi.

Scope

  • StreamCursor (sync) consuming SyncResponseContextIterator
  • AsyncStreamCursor consuming AsyncResponseContextIterator
  • Own-session mode (auto-commit, no interactive tx): cursor acquires
    a session from the pool for the duration of the stream and
    releases on finish / close / error
  • Interactive-tx mode: stream runs on the connection's tx session,
    with exclusivity — while a stream is active no other cursor on
    the same connection may execute (would corrupt the tx session);
    commit / rollback should reject while a stream is running
  • rowcount semantics for streaming (likely -1 until drained)
  • close() must cleanly terminate a mid-flight stream (cancel +
    discard session, or drain) in both sync and async paths
  • Integration tests against a local YDB (see
    .github/docker/docker-compose.yml)
  • Unit tests with fake session/stream pools
  • README documentation for the new flag + both code examples
  • SQLAlchemy dialect wiring for stream_results /
    yield_per — likely a follow-up in ydb-sqlalchemy, but worth
    mentioning here

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions