Skip to content

Commit ed84c85

Browse files
rustyconoverclaude
andcommitted
Add arrow-ipc output format to vgi-client CLI
Add --format arrow-ipc option for outputting function results in Apache Arrow IPC streaming format. This is useful for: - Debugging VGI protocol issues - Inspecting raw output data with Arrow tools - Piping data to other Arrow-aware tools Usage: vgi-client --function sequence --args '[10]' --format arrow-ipc -o out.arrow vgi-client --function echo --input data.parquet --format arrow-ipc -o - The output can be read with any Arrow implementation: with open('out.arrow', 'rb') as f: reader = ipc.open_stream(f) table = reader.read_all() Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent fdbed72 commit ed84c85

2 files changed

Lines changed: 80 additions & 6 deletions

File tree

tests/client/test_cli.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,37 @@ def test_output_format_parquet(
442442
table = pq.read_table(str(output_file))
443443
assert table.num_rows == 3
444444

445+
def test_output_format_arrow_ipc(
446+
self, example_worker: str, input_parquet: Path, tmp_path: Path
447+
) -> None:
448+
"""Arrow IPC streaming output format."""
449+
from pyarrow import ipc
450+
451+
output_file = tmp_path / "output.arrow"
452+
runner = CliRunner()
453+
result = runner.invoke(
454+
cli,
455+
[
456+
"--input",
457+
str(input_parquet),
458+
"--output",
459+
str(output_file),
460+
"--format",
461+
"arrow-ipc",
462+
"--function",
463+
"echo",
464+
"--server",
465+
example_worker,
466+
],
467+
)
468+
assert result.exit_code == 0
469+
# Verify it's valid Arrow IPC
470+
with open(output_file, "rb") as f:
471+
reader = ipc.open_stream(f)
472+
table = reader.read_all()
473+
assert table.num_rows == 3
474+
assert "id" in table.schema.names
475+
445476

446477
class TestCLIOptions:
447478
"""Tests for various CLI options."""

vgi/client/cli.py

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@
2121
vgi-client --input data.parquet --function transform --args '["prefix"]' \
2222
--table-input-position 1
2323
24+
# Output in Arrow IPC format (useful for debugging):
25+
vgi-client --function sequence --args '[10]' --format arrow-ipc -o out.arrow
26+
vgi-client --function echo --input data.parquet --format arrow-ipc -o -
27+
2428
# Catalog operations (all nested under 'catalog'):
2529
vgi-client catalog list --server vgi-example-worker
2630
vgi-client catalog attach example --server vgi-example-worker
@@ -37,6 +41,7 @@
3741
from typing import TYPE_CHECKING, Any, cast
3842

3943
import pyarrow as pa
44+
from pyarrow import ipc
4045

4146
from vgi.arguments import Arguments
4247
from vgi.client.client import Client, ClientError, log
@@ -46,7 +51,21 @@
4651

4752

4853
class OutputWriter:
49-
"""Handles writing output batches in various formats."""
54+
"""Handles writing output batches in various formats.
55+
56+
Supported formats:
57+
- json: JSON Lines format (one JSON object per row)
58+
- csv: CSV with header
59+
- parquet: Apache Parquet columnar format
60+
- arrow-ipc: Apache Arrow IPC streaming format (useful for debugging)
61+
62+
The arrow-ipc format writes batches in the standard Arrow IPC streaming
63+
format, which can be read by any Arrow implementation. This is useful for:
64+
- Debugging VGI protocol issues
65+
- Inspecting raw output data with tools like pyarrow or arrow CLI
66+
- Piping data to other Arrow-aware tools
67+
68+
"""
5069

5170
def __init__(
5271
self, output_file: str | None, format: str, schema: pa.Schema | None = None
@@ -55,20 +74,23 @@ def __init__(
5574
5675
Args:
5776
output_file: Path to output file, "-" for stdout, or None for logging.
58-
format: Output format ("parquet", "csv", or "json").
77+
format: Output format ("parquet", "csv", "json", or "arrow-ipc").
5978
schema: Optional schema for the output data.
6079
6180
"""
6281
self.output_file = output_file
6382
self.format = format
6483
self.schema = schema
65-
self._writer: pq.ParquetWriter | None = None
84+
self._writer: pq.ParquetWriter | ipc.RecordBatchStreamWriter | None = None
6685
self._is_stdout = output_file == "-"
6786
self._first_write = True
87+
self._output_file_handle: io.IOBase | None = None
6888

6989
def _get_output_stream(self) -> Any:
7090
if self._is_stdout:
71-
return sys.stdout.buffer if self.format == "parquet" else sys.stdout
91+
if self.format in ("parquet", "arrow-ipc"):
92+
return sys.stdout.buffer
93+
return sys.stdout
7294
return self.output_file
7395

7496
def write_batch(self, batch: pa.RecordBatch) -> None:
@@ -91,6 +113,21 @@ def write_batch(self, batch: pa.RecordBatch) -> None:
91113
self._writer = pq.ParquetWriter(self.output_file, batch.schema)
92114
self._writer.write_batch(batch)
93115

116+
elif self.format == "arrow-ipc":
117+
if self._writer is None:
118+
if self._is_stdout:
119+
sink = pa.PythonFile(cast(io.IOBase, sys.stdout.buffer), mode="w")
120+
else:
121+
# Open file and keep handle for closing in close()
122+
self._output_file_handle = open( # noqa: SIM115
123+
self.output_file, "wb"
124+
)
125+
sink = pa.PythonFile(self._output_file_handle, mode="w")
126+
self._writer = ipc.new_stream(sink, batch.schema)
127+
# Type narrowing for mypy
128+
assert isinstance(self._writer, ipc.RecordBatchStreamWriter)
129+
self._writer.write_batch(batch)
130+
94131
elif self.format == "csv":
95132
output = self._get_output_stream()
96133
write_options = csv.WriteOptions(include_header=self._first_write)
@@ -127,6 +164,8 @@ def close(self) -> None:
127164
"""Close the underlying writer if one exists."""
128165
if self._writer is not None:
129166
self._writer.close()
167+
if self._output_file_handle is not None:
168+
self._output_file_handle.close()
130169

131170

132171
def _create_cli() -> Any:
@@ -154,9 +193,13 @@ def _create_cli() -> Any:
154193
@click.option(
155194
"--format",
156195
"output_format",
157-
type=click.Choice(["json", "csv", "parquet"]),
196+
type=click.Choice(["json", "csv", "parquet", "arrow-ipc"]),
158197
default="json",
159-
help="Output format (default: json)",
198+
help=(
199+
"Output format (default: json). Use 'arrow-ipc' for Apache Arrow IPC "
200+
"streaming format, which is useful for debugging or piping to other "
201+
"Arrow-aware tools."
202+
),
160203
)
161204
@click.option(
162205
"--function",

0 commit comments

Comments
 (0)