Skip to content

Commit 67a5d6c

Browse files
rustyconoverclaude
andcommitted
test(fixtures): add typed_probe cross-language table fixture
typed_probe binds typed const args (TIMESTAMPTZ/INTERVAL/BLOB/UBIGINT, each with a default) and echoes them into uint64/int64/blob/double columns. Values are normalized (epoch micros, interval→ms collapse matching vgi-go's GetScalarDuration) so this and the vgi-go fixture produce byte-identical output for the shared integration test. Handles both timedelta defaults and MonthDayNano INTERVAL literals. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent d9553d6 commit 67a5d6c

3 files changed

Lines changed: 162 additions & 0 deletions

File tree

vgi/_test_fixtures/table/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
SpatialFilterExampleFunction,
4949
ValuePruneFunction,
5050
)
51+
from vgi._test_fixtures.table.typed_probe import TypedProbeFunction
5152
from vgi._test_fixtures.table.late_materialization import (
5253
LateMaterializationFunction,
5354
)
@@ -137,6 +138,7 @@
137138
)
138139

139140
__all__ = [
141+
"TypedProbeFunction",
140142
"_CURRENT_VERSION",
141143
"_VERSIONED_CONSTRAINTS_CURRENT",
142144
"_VERSIONED_CONSTRAINTS_DATA",
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
# Copyright 2025, 2026 Query Farm LLC - https://query.farm
2+
3+
"""typed_probe — exercises typed const-argument binding and typed column emit.
4+
5+
Const args cover the less-common Arrow scalar types — TIMESTAMP, INTERVAL
6+
(duration), BLOB and UBIGINT — each with a default so calling ``typed_probe(n)``
7+
drives the default path and passing named args drives the scalar-extraction
8+
path. The output echoes the bound values into uint64 / int64 / blob / double
9+
columns. Values are echoed in normalized integer/byte form so this fixture and
10+
its vgi-go counterpart produce byte-identical results for the shared test.
11+
"""
12+
13+
from __future__ import annotations
14+
15+
import datetime
16+
from dataclasses import dataclass
17+
from typing import Annotated, ClassVar
18+
19+
import pyarrow as pa
20+
from vgi_rpc import ArrowSerializableDataclass
21+
from vgi_rpc.rpc import OutputCollector
22+
23+
from vgi.arguments import Arg
24+
from vgi.schema_utils import schema
25+
from vgi.table_function import (
26+
ProcessParams,
27+
TableFunctionGenerator,
28+
bind_fixed_schema,
29+
init_single_worker,
30+
)
31+
32+
_EPOCH = datetime.datetime(1970, 1, 1, tzinfo=datetime.UTC)
33+
34+
35+
def _iv_to_ms(iv: object) -> int:
36+
"""Collapse a duration/interval const to whole milliseconds.
37+
38+
A declared default arrives as a ``datetime.timedelta``; a SQL ``INTERVAL``
39+
literal arrives as a pyarrow ``MonthDayNano`` (DuckDB intervals are
40+
month-day-nano). Mirror vgi-go's GetScalarDuration collapse — months→30d,
41+
days→24h — so both implementations agree.
42+
"""
43+
if isinstance(iv, datetime.timedelta):
44+
return iv // datetime.timedelta(milliseconds=1)
45+
months = getattr(iv, "months", 0)
46+
days = getattr(iv, "days", 0)
47+
nanos = getattr(iv, "nanoseconds", 0)
48+
return months * 30 * 24 * 3600 * 1000 + days * 24 * 3600 * 1000 + nanos // 1_000_000
49+
50+
TYPED_PROBE_SCHEMA = schema(
51+
idx=pa.uint64(),
52+
ts_us=pa.int64(),
53+
iv_ms=pa.int64(),
54+
payload=pa.binary(),
55+
ub=pa.uint64(),
56+
f=pa.float64(),
57+
)
58+
59+
60+
@dataclass(kw_only=True)
61+
class TypedProbeArgs:
62+
"""Arguments for TypedProbeFunction — one named const per scalar type."""
63+
64+
n: Annotated[int, Arg(0, doc="Number of rows to emit", ge=0)]
65+
ts: Annotated[
66+
datetime.datetime,
67+
Arg(
68+
"ts",
69+
default=datetime.datetime(2026, 1, 2, 3, 4, 5, tzinfo=datetime.UTC),
70+
arrow_type=pa.timestamp("us", tz="UTC"),
71+
doc="Timestamp const (TIMESTAMPTZ)",
72+
),
73+
]
74+
iv: Annotated[
75+
datetime.timedelta,
76+
Arg(
77+
"iv",
78+
default=datetime.timedelta(milliseconds=1500),
79+
arrow_type=pa.duration("ns"),
80+
doc="Interval const (INTERVAL)",
81+
),
82+
]
83+
blob: Annotated[
84+
bytes,
85+
Arg("blob", default=b"vgi", arrow_type=pa.binary(), doc="Blob const (BLOB)"),
86+
]
87+
ub: Annotated[
88+
int,
89+
Arg("ub", default=9, arrow_type=pa.uint64(), doc="Unsigned const (UBIGINT)"),
90+
]
91+
f: Annotated[float, Arg("f", default=2.5, doc="Float const (DOUBLE)")]
92+
93+
94+
@dataclass(kw_only=True)
95+
class TypedProbeState(ArrowSerializableDataclass):
96+
"""Mutable state — the resolved const values plus emit cursor."""
97+
98+
n: int
99+
ts_us: int
100+
iv_ms: int
101+
payload: bytes
102+
ub: int
103+
f: float
104+
offset: int = 0
105+
106+
107+
@init_single_worker
108+
@bind_fixed_schema
109+
class TypedProbeFunction(TableFunctionGenerator[TypedProbeArgs, TypedProbeState]):
110+
"""Echo typed const args (timestamp/interval/blob/ubigint) into typed columns."""
111+
112+
FIXED_SCHEMA: ClassVar[pa.Schema] = TYPED_PROBE_SCHEMA
113+
114+
class Meta:
115+
"""Function metadata."""
116+
117+
name = "typed_probe"
118+
description = "Echoes typed const args (timestamp/interval/blob/ubigint) into typed columns"
119+
120+
@classmethod
121+
def initial_state(cls, params: ProcessParams[TypedProbeArgs]) -> TypedProbeState:
122+
"""Resolve const args into normalized integer/byte form."""
123+
a = params.args
124+
return TypedProbeState(
125+
n=a.n,
126+
ts_us=(a.ts - _EPOCH) // datetime.timedelta(microseconds=1),
127+
iv_ms=_iv_to_ms(a.iv),
128+
payload=a.blob,
129+
ub=a.ub,
130+
f=a.f,
131+
)
132+
133+
@classmethod
134+
def process(cls, params: ProcessParams[TypedProbeArgs], state: TypedProbeState, out: OutputCollector) -> None:
135+
"""Emit all rows in a single batch."""
136+
if state.offset >= state.n:
137+
out.finish()
138+
return
139+
rows = list(range(state.offset, state.n))
140+
state.offset = state.n
141+
out.emit(
142+
pa.RecordBatch.from_pydict(
143+
{
144+
"idx": pa.array(rows, type=pa.uint64()),
145+
"ts_us": pa.array([state.ts_us] * len(rows), type=pa.int64()),
146+
"iv_ms": pa.array([state.iv_ms] * len(rows), type=pa.int64()),
147+
"payload": pa.array([state.payload] * len(rows), type=pa.binary()),
148+
"ub": pa.array([state.ub] * len(rows), type=pa.uint64()),
149+
"f": pa.array([state.f + i for i in rows], type=pa.float64()),
150+
},
151+
schema=TYPED_PROBE_SCHEMA,
152+
)
153+
)

vgi/_test_fixtures/worker.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@
8989
RandomBytesFunction,
9090
RandomIntFunction,
9191
ReturnSecretValueFunction,
92+
ScaleBySettingFunction,
93+
SecretFieldFunction,
9294
SmartFormatPrefixFunction,
9395
SmartFormatWidthFunction,
9496
SumValuesFunction,
@@ -139,6 +141,7 @@
139141
MakeSeriesRangeFunction,
140142
MakeSeriesStepFunction,
141143
MissingBatchIndexTagFunction,
144+
TypedProbeFunction,
142145
NamedParamsEchoFunction,
143146
NestedSequenceFunction,
144147
NonMonotoneBatchIndexFunction,
@@ -360,6 +363,7 @@ def _build_enum_stats() -> dict[str, ColumnStatisticsInput]:
360363
MakeSeriesFloatFunction,
361364
MakeSeriesRangeFunction,
362365
MakeSeriesStepFunction,
366+
TypedProbeFunction,
363367
MakePairsIntFunction,
364368
MakePairsIntStrFunction,
365369
MakePairsStrFunction,
@@ -454,6 +458,8 @@ def _build_enum_stats() -> dict[str, ColumnStatisticsInput]:
454458
RandomBytesFunction,
455459
RandomIntFunction,
456460
ReturnSecretValueFunction,
461+
ScaleBySettingFunction,
462+
SecretFieldFunction,
457463
SmartFormatPrefixFunction,
458464
SmartFormatWidthFunction,
459465
SumValuesFunction,
@@ -1580,6 +1586,7 @@ class Settings:
15801586
greeting: Annotated[str, Setting(desc="Custom greeting message")] = "Hello"
15811587
multiplier: Annotated[int, Setting(desc="Value multiplier")] = 1
15821588
threshold: Annotated[int, Setting(desc="Filter threshold")] = 0
1589+
scale_factor: Annotated[float, Setting(desc="Float scale factor")] = 1.0
15831590
config: Annotated[ # type: ignore[valid-type]
15841591
pa.struct([("start", pa.int64()), ("step", pa.int64()), ("label", pa.string())]),
15851592
Setting(desc="Sequence configuration struct"),

0 commit comments

Comments
 (0)