Skip to content

Commit 81f7236

Browse files
rustyconoverclaude
andcommitted
streaming-partitioned aggregate protocol
Add a third execution path for VGI aggregate functions, alongside the existing GROUP BY (update/combine/finalize) and windowed (window/_init/ _batch) paths. Functions opt in via Meta.streaming_partitioned=True; the VGI DuckDB extension's optimizer rule replaces eligible LogicalWindow nodes with a custom streaming operator that pipes input chunks straight to the worker — no DuckDB-side partition materialisation. Three new classmethod hooks on AggregateFunction: streaming_open(params) -> StreamingState streaming_chunk(chunk, state, partition_key_count, order_key_count, params) -> pa.Array streaming_close(state, params) -> None The worker holds concurrent per-partition state in a hash map keyed by partition tuple; each input row updates its partition's state and emits a snapshot. Memory is bounded by partitions × per-partition-state, not by row count — the structural answer to "running aggregate over unbounded ordered input." Wire protocol: three new unary RPCs (aggregate_streaming_open, aggregate_streaming_chunk, aggregate_streaming_close), all carrying the standard {request: binary} envelope shape. Session state is held in an in-process LRU cache for the fast path and persisted to FunctionStorage (under the existing aggregate_window_partition_put key) so chunk RPCs landing on a different worker pool entry can rehydrate correctly. Same affinity pattern as the windowed path. Eligibility (enforced by the C++ optimizer rule, not this change): cumulative frame only, no EXCLUDE/DISTINCT/FILTER/arg-orders, no const-arg parameters in v1. Queries that don't satisfy fall back to the standard windowed path; the streaming path is additive, not a replacement. Documented in docs/aggregate-functions.md — including a note that pre-aggregation (GROUP BY ... + OVER) is the right pattern for most analytics shapes; the streaming path's unique value is for shapes where pre-aggregation isn't algebraically valid (per-fill running views, very high cardinality, future continuous feeds). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent a6e87d3 commit 81f7236

6 files changed

Lines changed: 615 additions & 0 deletions

File tree

docs/aggregate-functions.md

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,74 @@ worker = Worker(
286286

287287
The framework automatically detects `AggregateFunction` subclasses and registers them with the correct function type in the catalog.
288288

289+
## Streaming-Partitioned Variant
290+
291+
For `OVER (PARTITION BY ... ORDER BY ...)` queries against unbounded inputs (e.g. running aggregates across years of trade history), the standard windowed path materializes each partition in DuckDB memory before the aggregate sees it — fine for bounded data, OOMs at scale.
292+
293+
The `streaming_partitioned` opt-in routes those queries through a custom physical operator in the VGI DuckDB extension: input chunks pipe directly to the worker, the worker maintains concurrent per-partition state in a hash map keyed by partition tuple, and each input chunk produces a same-length output array of cumulative snapshots. No DuckDB-side partition materialization; memory is bounded by `partitions × state_per_partition`, not by row count.
294+
295+
```python
296+
class MyRunningAgg(AggregateFunction[MyState]):
297+
class Meta:
298+
name = "my_running_agg"
299+
streaming_partitioned = True # opt-in
300+
# supports_window may also be set; the optimizer chooses the
301+
# streaming path for eligible queries and falls back to the
302+
# windowed path otherwise.
303+
304+
@classmethod
305+
def streaming_open(cls, params: ProcessParams[None]) -> dict[str, Any]:
306+
# Build cross-partition session state. Returned object lives in
307+
# an in-process cache for the duration of the session and is
308+
# also persisted to FunctionStorage so chunk RPCs landing on a
309+
# different pool worker can rehydrate.
310+
return {"partition_states": {}}
311+
312+
@classmethod
313+
def streaming_chunk(
314+
cls,
315+
chunk: pa.RecordBatch,
316+
streaming_state: dict[str, Any],
317+
partition_key_count: int,
318+
order_key_count: int,
319+
params: ProcessParams[None],
320+
) -> pa.Array:
321+
# Column layout in `chunk`:
322+
# [partition_key_cols..., order_key_cols..., value_cols...]
323+
# Return one output value per input row (cumulative snapshot
324+
# at that row's position in its partition's order).
325+
...
326+
327+
@classmethod
328+
def streaming_close(cls, streaming_state, params) -> None:
329+
# Cleanup hook (called once per session). Default: no-op.
330+
...
331+
```
332+
333+
**Eligibility for the streaming path** is decided by the extension's optimizer rule and requires:
334+
335+
- `streaming_partitioned = True` on the function's Meta.
336+
- A cumulative frame: `ROWS/RANGE/GROUPS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW` (or the implicit cumulative frame DuckDB emits when only `ORDER BY` is given).
337+
- No `EXCLUDE`, `DISTINCT`, `FILTER (WHERE ...)`, or aggregate-arg `ORDER BY`.
338+
- The worker function declares no const-arg parameters (v1 limitation).
339+
340+
Queries that don't satisfy all of these fall back to the standard windowed path automatically. The streaming path is opt-in and additive — it does not replace `update`/`combine`/`finalize`, which still service `GROUP BY` queries normally.
341+
342+
**When pre-aggregation is the better answer.** For most analytics shapes — "EOD positions per book per day, carrying forward across days" — pre-aggregating the input is the cleanest pattern in plain SQL:
343+
344+
```sql
345+
WITH per_period_net AS (
346+
SELECT book, period_key, symbol, SUM(quantity) AS quantity
347+
FROM trades GROUP BY book, period_key, symbol
348+
)
349+
SELECT book, period_key,
350+
my_running_agg(symbol, quantity)
351+
OVER (PARTITION BY book ORDER BY period_key) AS running
352+
FROM per_period_net;
353+
```
354+
355+
The pre-aggregate collapses fills within each period before the OVER sees them, so the per-row output cardinality of the OVER matches the user's actual intent. The streaming path is the right tool when pre-aggregation isn't viable: per-fill running views, very high symbol cardinality per partition, or aggregates whose state isn't algebraically reducible by a pre-aggregate.
356+
289357
## Example Functions
290358

291359
See `vgi/examples/aggregate.py` for complete implementations:

vgi/aggregate_function.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,3 +515,90 @@ def window_batch(
515515
cls.window(rid, frames, partition, window_state, params)
516516
for rid, frames in zip(row_ids, subframes, strict=True)
517517
]
518+
519+
# ------------------------------------------------------------------
520+
# Optional streaming-partitioned callbacks
521+
# ------------------------------------------------------------------
522+
# Enable by setting ``Meta.streaming_partitioned = True`` and overriding
523+
# ``streaming_chunk()`` (and optionally ``streaming_open`` /
524+
# ``streaming_close``).
525+
#
526+
# Streaming-partitioned aggregates handle queries shaped like
527+
# ``f(...) OVER (PARTITION BY p ORDER BY o)`` with a cumulative frame
528+
# (``UNBOUNDED PRECEDING -> CURRENT ROW``) where the input is too large
529+
# to materialise in DuckDB memory but compresses heavily into per-
530+
# partition state. The framework streams input chunks to the worker;
531+
# the worker maintains concurrent per-partition state in a hash map and
532+
# emits one output row per input row.
533+
534+
@classmethod
535+
def streaming_open(cls, params: ProcessParams[Any]) -> Any:
536+
"""Build cross-partition global state for a streaming session.
537+
538+
Called once when ``aggregate_streaming_open`` arrives, before any
539+
chunk is processed. Return any object (it lives in an in-process
540+
cache keyed by ``execution_id`` for the duration of the session).
541+
542+
Typical contents: a ``dict`` of per-partition aggregate states
543+
(populated lazily as new partition keys appear in input chunks),
544+
plus any cross-partition resources to share — symbol intern
545+
tables, allocator pools, prepared output buffers.
546+
547+
Default implementation returns ``None`` (no shared state); the
548+
function still works if ``streaming_chunk`` keeps everything in
549+
local variables, but per-partition state would have to live
550+
somewhere caller-supplied.
551+
"""
552+
return None
553+
554+
@classmethod
555+
def streaming_chunk(
556+
cls,
557+
chunk: pa.RecordBatch,
558+
streaming_state: Any,
559+
partition_key_count: int,
560+
order_key_count: int,
561+
params: ProcessParams[Any],
562+
) -> "pa.Array | list[Any]":
563+
"""Process one chunk of streaming input.
564+
565+
Args:
566+
chunk: Input rows for this batch. Schema layout is
567+
``[partition_key_cols..., order_key_cols..., value_cols...]``
568+
— the first ``partition_key_count`` columns are partition
569+
keys (used to dispatch to the right per-partition state),
570+
the next ``order_key_count`` are order keys (informational;
571+
may be used to verify monotonicity), the rest are the
572+
function's value arguments in declaration order.
573+
streaming_state: Whatever ``streaming_open`` returned. The
574+
framework passes the same object on every chunk; mutate
575+
in place to accumulate state across chunks.
576+
partition_key_count: Number of leading columns that form the
577+
partition key.
578+
order_key_count: Number of columns following the partition key
579+
that form the order key.
580+
params: Shared ``ProcessParams``.
581+
582+
Returns:
583+
Either a :class:`pa.Array` of length ``chunk.num_rows`` matching
584+
the function's output type, or a list of the same length
585+
(which the framework converts via ``pa.array``). Each output
586+
value is the cumulative aggregate snapshot at that input
587+
row's position in its partition's order.
588+
"""
589+
raise NotImplementedError(
590+
f"{cls.__name__}: Meta.streaming_partitioned=True requires overriding streaming_chunk()"
591+
)
592+
593+
@classmethod
594+
def streaming_close(cls, streaming_state: Any, params: ProcessParams[Any]) -> None:
595+
"""Tear down streaming session state.
596+
597+
Called once when ``aggregate_streaming_close`` arrives, after the
598+
last chunk. Use to release any external resources held by
599+
``streaming_state``. The framework drops its reference after this
600+
call, so anything not held elsewhere is GCed naturally.
601+
602+
Default implementation is a no-op.
603+
"""
604+
return None

vgi/catalog/catalog_interface.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,11 @@ class FunctionInfo(CatalogSchemaObject, ArrowSerializableDataclass):
367367
distinct_dependent: DistinctDependence = DistinctDependence.NOT_DISTINCT_DEPENDENT
368368
# True if the aggregate implements the window() callback
369369
supports_window: bool = False
370+
# True if the aggregate opts into the streaming-partitioned protocol —
371+
# ``aggregate_streaming_open`` / ``_chunk`` / ``_close``. The DuckDB
372+
# extension's optimizer rule may rewrite eligible LogicalWindow nodes to
373+
# use this path.
374+
streaming_partitioned: bool = False
370375

371376
# True if a table-in-out function declares a finalize/finish stage.
372377
# The C++ extension uses this to conditionally register
@@ -2133,6 +2138,7 @@ def _function_to_info(self, func_cls: type, schema_name: str) -> FunctionInfo:
21332138
order_dependent=meta.order_dependent,
21342139
distinct_dependent=meta.distinct_dependent,
21352140
supports_window=meta.supports_window,
2141+
streaming_partitioned=meta.streaming_partitioned,
21362142
has_finalize=meta.has_finalize,
21372143
# Settings
21382144
required_settings=meta.required_settings,

vgi/metadata.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ class ResolvedMetadata:
328328
order_dependent: OrderDependence = OrderDependence.NOT_ORDER_DEPENDENT
329329
distinct_dependent: DistinctDependence = DistinctDependence.NOT_DISTINCT_DEPENDENT
330330
supports_window: bool = False
331+
streaming_partitioned: bool = False
331332

332333
# Table-in-out specific: True if the function has a meaningful finalize phase
333334
# (override of finalize()/finish()). Used by the C++ extension to decide
@@ -359,6 +360,7 @@ def to_dict(self) -> dict[str, Any]:
359360
"order_dependent": self.order_dependent.name,
360361
"distinct_dependent": self.distinct_dependent.name,
361362
"supports_window": self.supports_window,
363+
"streaming_partitioned": self.streaming_partitioned,
362364
"has_finalize": self.has_finalize,
363365
}
364366

@@ -387,6 +389,7 @@ def from_dict(d: dict[str, Any]) -> ResolvedMetadata:
387389
order_dependent=OrderDependence[d.get("order_dependent", "NOT_ORDER_DEPENDENT")],
388390
distinct_dependent=DistinctDependence[d.get("distinct_dependent", "NOT_DISTINCT_DEPENDENT")],
389391
supports_window=d.get("supports_window", False),
392+
streaming_partitioned=d.get("streaming_partitioned", False),
390393
has_finalize=d.get("has_finalize", False),
391394
)
392395

@@ -777,6 +780,7 @@ def _normalize_examples(
777780
"order_dependent",
778781
"distinct_dependent",
779782
"supports_window",
783+
"streaming_partitioned",
780784
# Scalar function specific
781785
"output_type", # pa.DataType | type[AnyArrow] for scalar functions
782786
}
@@ -941,6 +945,7 @@ def resolve_metadata(cls: type) -> ResolvedMetadata:
941945
order_dependent=attrs.get("order_dependent", OrderDependence.NOT_ORDER_DEPENDENT),
942946
distinct_dependent=attrs.get("distinct_dependent", DistinctDependence.NOT_DISTINCT_DEPENDENT),
943947
supports_window=bool(attrs.get("supports_window", False)),
948+
streaming_partitioned=bool(attrs.get("streaming_partitioned", False)),
944949
has_finalize=_detect_has_finalize(cls, function_type),
945950
)
946951

@@ -1028,6 +1033,7 @@ def _detect_has_finalize(cls: type, function_type: CatalogFunctionType) -> bool:
10281033
pa.field("order_dependent", pa.string()),
10291034
pa.field("distinct_dependent", pa.string()),
10301035
pa.field("supports_window", pa.bool_()),
1036+
pa.field("streaming_partitioned", pa.bool_()),
10311037
pa.field("has_finalize", pa.bool_()),
10321038
]
10331039
)

vgi/protocol.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1057,6 +1057,93 @@ class AggregateWindowBatchResponse(ArrowSerializableDataclass):
10571057
result_batch: bytes # Full IPC stream bytes (count rows, output schema)
10581058

10591059

1060+
# ---------------------------------------------------------------------------
1061+
# Aggregate Streaming-Partitioned RPC Types
1062+
# ---------------------------------------------------------------------------
1063+
# Streaming protocol for partitioned aggregates whose state compresses
1064+
# heavily relative to input rows (e.g. portfolio_agg's positions dict vs
1065+
# millions of fills). DuckDB streams input chunks to the worker; the worker
1066+
# maintains concurrent per-partition state in a hash map keyed by partition
1067+
# key, dispatches each row to its partition's state, and emits one snapshot
1068+
# per input row. No DuckDB-side partition materialisation. Cumulative
1069+
# semantics only (UNBOUNDED PRECEDING -> CURRENT ROW); other frame shapes
1070+
# fall back to the non-streaming path.
1071+
1072+
1073+
@dataclass(frozen=True, slots=True, kw_only=True)
1074+
class AggregateStreamingOpenRequest(ArrowSerializableDataclass):
1075+
"""Request for aggregate_streaming_open — start a streaming session.
1076+
1077+
The worker resolves the function, calls ``streaming_open`` to build the
1078+
cross-partition global state, and returns an ``execution_id`` that
1079+
subsequent chunk/close calls reference.
1080+
1081+
``input_schema`` is the schema of every chunk shipped via
1082+
``streaming_chunk``. The first ``partition_key_count`` columns are
1083+
partition-key columns (used by the worker to dispatch rows to the right
1084+
per-partition state). The next ``order_key_count`` columns are
1085+
order-key columns (informational; the worker may verify monotonicity).
1086+
Remaining columns are the function's value arguments, in declaration
1087+
order.
1088+
"""
1089+
1090+
function_name: str
1091+
arguments: Annotated[Arguments, ArrowType(pa.binary())]
1092+
input_schema: Annotated[pa.Schema, ArrowType(pa.binary())]
1093+
partition_key_count: int
1094+
order_key_count: int
1095+
output_schema: Annotated[pa.Schema, ArrowType(pa.binary())]
1096+
settings: Annotated[pa.RecordBatch | None, ArrowType(pa.binary())] = None
1097+
secrets: Annotated[pa.RecordBatch | None, ArrowType(pa.binary())] = None
1098+
attach_id: bytes | None = None
1099+
1100+
1101+
@dataclass(frozen=True, slots=True, kw_only=True)
1102+
class AggregateStreamingOpenResponse(ArrowSerializableDataclass):
1103+
"""Response from aggregate_streaming_open — session token."""
1104+
1105+
execution_id: bytes
1106+
1107+
1108+
@dataclass(frozen=True, slots=True, kw_only=True)
1109+
class AggregateStreamingChunkRequest(ArrowSerializableDataclass):
1110+
"""Request for aggregate_streaming_chunk — process one input chunk.
1111+
1112+
``input_batch`` schema must match the ``input_schema`` agreed at
1113+
``streaming_open``. The worker iterates rows, dispatches to per-partition
1114+
state by the partition-key columns, applies the function's update logic,
1115+
and returns a same-length output array.
1116+
"""
1117+
1118+
function_name: str
1119+
execution_id: bytes
1120+
input_batch: bytes # Full IPC stream bytes
1121+
attach_id: bytes | None = None
1122+
1123+
1124+
@dataclass(frozen=True, slots=True, kw_only=True)
1125+
class AggregateStreamingChunkResponse(ArrowSerializableDataclass):
1126+
"""Response from aggregate_streaming_chunk — same-length output batch."""
1127+
1128+
result_batch: bytes # Full IPC stream bytes (one row per input row)
1129+
1130+
1131+
@dataclass(frozen=True, slots=True, kw_only=True)
1132+
class AggregateStreamingCloseRequest(ArrowSerializableDataclass):
1133+
"""Request for aggregate_streaming_close — end the session, free state."""
1134+
1135+
function_name: str
1136+
execution_id: bytes
1137+
attach_id: bytes | None = None
1138+
1139+
1140+
@dataclass(frozen=True, slots=True, kw_only=True)
1141+
class AggregateStreamingCloseResponse(ArrowSerializableDataclass):
1142+
"""Response from aggregate_streaming_close — empty ack."""
1143+
1144+
pass
1145+
1146+
10601147
# ---------------------------------------------------------------------------
10611148
# VGI Protocol
10621149
# ---------------------------------------------------------------------------
@@ -1138,6 +1225,26 @@ def aggregate_window_batch(self, request: AggregateWindowBatchRequest) -> Aggreg
11381225
"""Compute ``count`` window output rows in one batched RPC."""
11391226
...
11401227

1228+
# ========== Aggregate Streaming-Partitioned Methods (optional, all unary) ==========
1229+
1230+
def aggregate_streaming_open(
1231+
self, request: AggregateStreamingOpenRequest
1232+
) -> AggregateStreamingOpenResponse:
1233+
"""Start a streaming-partitioned aggregate session."""
1234+
...
1235+
1236+
def aggregate_streaming_chunk(
1237+
self, request: AggregateStreamingChunkRequest
1238+
) -> AggregateStreamingChunkResponse:
1239+
"""Process one input chunk; returns one output row per input row."""
1240+
...
1241+
1242+
def aggregate_streaming_close(
1243+
self, request: AggregateStreamingCloseRequest
1244+
) -> AggregateStreamingCloseResponse:
1245+
"""End the streaming session, free per-session state."""
1246+
...
1247+
11411248
# ========== Catalog - Discovery ==========
11421249

11431250
def catalog_catalogs(self) -> CatalogsResponse:

0 commit comments

Comments
 (0)