Skip to content

Commit 42872fa

Browse files
rustyconoverclaude
andcommitted
Add protocol state to all output batches and catalog results
- Update ProtocolOutput.metadata() in table_function.py to include vgi.protocol_state: output metadata on all output batches - Update ProtocolOutput.metadata() in table_in_out_function.py similarly - Update worker.py catalog result serialization to include protocol state on all catalog_result messages using IPC streams This completes the protocol state requirement where all record batches must include protocol state metadata for validation. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 6e3960f commit 42872fa

3 files changed

Lines changed: 53 additions & 25 deletions

File tree

vgi/table_function.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -151,25 +151,29 @@ class ProtocolOutput:
151151

152152
def metadata(
153153
self, invocation: vgi.invocation.Invocation
154-
) -> pa.KeyValueMetadata | None:
154+
) -> pa.KeyValueMetadata:
155155
"""Create metadata for this output based on the status.
156156
157157
Args:
158158
invocation: The Invocation for this function invocation, passed through
159159
to Message.add_to_metadata() for correlation information.
160160
161161
Returns:
162-
KeyValueMetadata containing status and optional log message fields.
162+
KeyValueMetadata containing protocol state, status and optional log
163+
message fields.
163164
164165
"""
165-
metadata_dict: dict[str, str] = {}
166+
# Start with protocol state (required for VGI protocol)
167+
metadata_dict: dict[bytes, bytes] = {
168+
vgi.ipc_utils.PROTOCOL_STATE_KEY: vgi.ipc_utils.ProtocolState.OUTPUT.encode()
169+
}
166170

171+
# Add log message metadata if present
167172
if self.log_message is not None:
168-
metadata_dict = self.log_message.add_to_metadata(invocation, metadata_dict)
173+
for k, v in self.log_message.add_to_metadata(invocation, {}).items():
174+
metadata_dict[k.encode()] = v.encode()
169175

170-
return pa.KeyValueMetadata(
171-
{k.encode(): v.encode() for k, v in metadata_dict.items()}
172-
)
176+
return pa.KeyValueMetadata(metadata_dict)
173177

174178
@classmethod
175179
def from_process_result(cls, process_result: OutputComplete) -> ProtocolOutput:

vgi/table_in_out_function.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -161,25 +161,30 @@ class ProtocolOutput:
161161

162162
def metadata(
163163
self, invocation: vgi.invocation.Invocation
164-
) -> pa.KeyValueMetadata | None:
164+
) -> pa.KeyValueMetadata:
165165
"""Create metadata for this output based on the status.
166166
167167
Args:
168168
invocation: The Invocation for this function invocation, passed through
169169
to Message.add_to_metadata() for correlation information.
170170
171171
Returns:
172-
KeyValueMetadata containing status and optional log message fields.
172+
KeyValueMetadata containing protocol state, status and optional log
173+
message fields.
173174
174175
"""
175-
metadata_dict: dict[str, str] = {"vgi.status": self.status.value}
176+
# Start with protocol state (required for VGI protocol) and status
177+
metadata_dict: dict[bytes, bytes] = {
178+
vgi.ipc_utils.PROTOCOL_STATE_KEY: vgi.ipc_utils.ProtocolState.OUTPUT.encode(),
179+
b"vgi.status": self.status.value.encode(),
180+
}
176181

182+
# Add log message metadata if present
177183
if self.log_message is not None:
178-
metadata_dict = self.log_message.add_to_metadata(invocation, metadata_dict)
184+
for k, v in self.log_message.add_to_metadata(invocation, {}).items():
185+
metadata_dict[k.encode()] = v.encode()
179186

180-
return pa.KeyValueMetadata(
181-
{k.encode(): v.encode() for k, v in metadata_dict.items()}
182-
)
187+
return pa.KeyValueMetadata(metadata_dict)
183188

184189
@classmethod
185190
def from_process_result(

vgi/worker.py

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -974,28 +974,47 @@ def _handle_catalog_invocation(
974974

975975
fn_log.debug("catalog_method_result", result=result)
976976

977-
# Serialize and stream result
977+
# Serialize and stream result with protocol state metadata
978978
# Result types:
979979
# - None → empty batch (0 rows, 0 columns)
980980
# - list of primitives → convert to single-column batch (e.g., catalogs())
981981
# - Dataclass with serialize() → serialize to bytes, write
982982
# - Iterable of dataclasses → stream multiple serialized items
983+
from vgi.ipc_utils import ProtocolState, protocol_state_metadata
984+
985+
catalog_result_metadata = protocol_state_metadata(ProtocolState.CATALOG_RESULT)
986+
983987
if result is None:
984988
# Write empty batch to signal no result
985989
batch = pa.RecordBatch.from_pydict({})
986-
stdout.write(batch.schema.serialize().to_pybytes())
987-
stdout.write(batch.serialize().to_pybytes())
990+
with pa.ipc.new_stream(cast(IOBase, stdout), batch.schema) as writer:
991+
writer.write_batch(batch, custom_metadata=catalog_result_metadata)
988992
elif isinstance(result, list) and (
989993
not result or not hasattr(result[0], "serialize")
990994
):
991995
# List of primitives (e.g., strings from catalogs())
992996
batch = pa.RecordBatch.from_pydict({"value": result})
993-
stdout.write(batch.schema.serialize().to_pybytes())
994-
stdout.write(batch.serialize().to_pybytes())
997+
with pa.ipc.new_stream(cast(IOBase, stdout), batch.schema) as writer:
998+
writer.write_batch(batch, custom_metadata=catalog_result_metadata)
995999
elif hasattr(result, "serialize"):
996-
# Single dataclass result - write serialized bytes directly
997-
result_bytes = result.serialize()
998-
stdout.write(result_bytes)
1000+
# Single dataclass result - serialize with protocol state if possible
1001+
# Check if serialize() accepts custom_metadata parameter
1002+
import inspect
1003+
1004+
sig = inspect.signature(result.serialize)
1005+
if "custom_metadata" in sig.parameters:
1006+
result_bytes = result.serialize(custom_metadata=catalog_result_metadata)
1007+
stdout.write(result_bytes)
1008+
else:
1009+
# Fallback: use serialize() then we need to re-wrap with protocol state
1010+
# This is less efficient but ensures protocol state is included
1011+
result_bytes = result.serialize()
1012+
# Deserialize and re-serialize with protocol state
1013+
from vgi.ipc_utils import deserialize_record_batch
1014+
1015+
batch, _ = deserialize_record_batch(result_bytes)
1016+
with pa.ipc.new_stream(cast(IOBase, stdout), batch.schema) as writer:
1017+
writer.write_batch(batch, custom_metadata=catalog_result_metadata)
9991018
elif isinstance(result, list) and result and hasattr(result[0], "to_row_dict"):
10001019
# List of catalog objects with to_row_dict() - use efficient batch writing
10011020
# Determine the schema based on method and type parameter
@@ -1045,12 +1064,12 @@ def _handle_catalog_invocation(
10451064
# Fallback: use the ARROW_SCHEMA from the first item's class
10461065
schema = type(result[0]).ARROW_SCHEMA
10471066

1048-
# Collect all rows and write as single batch
1067+
# Collect all rows and write as single batch with protocol state
10491068
rows = [item.to_row_dict() for item in result]
10501069
batch = pa.RecordBatch.from_pylist(rows, schema=schema)
10511070

10521071
with pa.ipc.new_stream(cast(IOBase, stdout), schema) as writer:
1053-
writer.write_batch(batch)
1072+
writer.write_batch(batch, custom_metadata=catalog_result_metadata)
10541073
elif isinstance(result, list) and not result:
10551074
# Empty list - need to determine schema for empty batch
10561075
from vgi.catalog import (
@@ -1086,7 +1105,7 @@ def _handle_catalog_invocation(
10861105

10871106
batch = pa.RecordBatch.from_pylist([], schema=schema)
10881107
with pa.ipc.new_stream(cast(IOBase, stdout), schema) as writer:
1089-
writer.write_batch(batch)
1108+
writer.write_batch(batch, custom_metadata=catalog_result_metadata)
10901109
else:
10911110
raise TypeError(
10921111
f"Catalog method returned unsupported type: "

0 commit comments

Comments
 (0)