Skip to content

Commit c727df3

Browse files
rustyconoverclaude
andcommitted
refactor: unify storage protocols into single FunctionStorage interface
Merged InitStorage and WorkerStateStorage protocols into a unified FunctionStorage protocol with consistent naming: - global_put/get/delete/exists for init data - worker_put/collect for worker state - queue_push/pop/clear for work queue Merged InitStorageSqlite and WorkerStateStorageSqlite into single FunctionStorageSqlite implementation. Users now implement one class for custom storage backends. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent e47e026 commit c727df3

4 files changed

Lines changed: 223 additions & 214 deletions

File tree

.beads/issues.jsonl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
{"id":"vgi-python-e37","title":"move Invocation from function.py out to own file","description":"The Invocation clas is kind of seperate from functions, so it should be in its own file. Move it and all of its other associated classes like InvocationType to its own file","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T09:18:46.605941-05:00","created_by":"rusty","updated_at":"2026-01-04T09:24:37.922675-05:00","closed_at":"2026-01-04T09:24:37.922675-05:00","close_reason":"Closed"}
22
{"id":"vgi-python-odi","title":"Change max_processes from method to property in Function hierarchy","description":"Refactor max_processes from a method to a property across the Function class hierarchy (Function, ScalarFunction, TableFunctionGenerator, TableInOutFunction, etc.). This makes the API more consistent since max_processes is effectively a constant per function class and properties are more idiomatic for such values.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T11:25:29.750648-05:00","created_by":"rusty","updated_at":"2026-01-04T11:50:57.566545-05:00","closed_at":"2026-01-04T11:50:57.566545-05:00","close_reason":"Closed"}
33
{"id":"vgi-python-p91","title":"Move exception classes from function.py to own file","description":"Move InitIdentifierError and SchemaValidationError from vgi/function.py to a new vgi/exceptions.py file. Update imports in function.py and any other files that reference these exceptions.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T09:12:28.058227-05:00","created_by":"rusty","updated_at":"2026-01-04T09:17:52.477661-05:00","closed_at":"2026-01-04T09:17:52.477661-05:00","close_reason":"Closed"}
4+
{"id":"vgi-python-zf7","title":"Unify storage protocols into single FunctionStorage interface","description":"## Problem\n\nCurrently there are two separate storage protocols (InitStorage and WorkerStateStorage) with inconsistent naming:\n- `create` vs `store` (different verbs for similar operations)\n- `collect_and_delete` is verbose and describes implementation\n\nUsers wanting a custom storage backend (Redis, DynamoDB, etc.) must implement two separate classes.\n\n## Solution\n\nUnify into a single `FunctionStorage` protocol with consistent naming using prefixes to group related methods:\n\n```python\nclass FunctionStorage(Protocol):\n \"\"\"Storage protocol for VGI distributed function execution.\n \n Three access patterns:\n - Global state: Init data shared across all workers (key-value with auto-generated keys)\n - Worker state: Partial results per worker (collected during finalization)\n - Work queue: Atomic work distribution across workers (FIFO queue)\n \"\"\"\n \n # Global state (init data)\n def global_put(self, value: bytes) -\u003e bytes: ... # Returns auto-generated key\n def global_get(self, key: bytes) -\u003e bytes: ...\n def global_delete(self, key: bytes) -\u003e None: ...\n def global_exists(self, key: bytes) -\u003e bool: ...\n \n # Worker state (partial results per worker)\n def worker_put(self, invocation_id: bytes, worker_id: int, state: bytes) -\u003e None: ...\n def worker_collect(self, invocation_id: bytes) -\u003e list[bytes]: ... # Atomic collect+delete\n \n # Work queue (distributed work items)\n def queue_push(self, invocation_id: bytes, items: list[bytes]) -\u003e int: ...\n def queue_pop(self, invocation_id: bytes) -\u003e bytes | None: ... # Atomic claim\n def queue_clear(self, invocation_id: bytes) -\u003e int: ...\n```\n\n## Design Rationale\n\n- **Prefixes** (`global_`, `worker_`, `queue_`): Clear grouping, good autocomplete\n- **Consistent verbs**: `put/get` for storage, `push/pop` for queue\n- **Minimal interface**: 9 methods total (down from 9, but now unified)\n- **Single class variable** in Function: `storage: ClassVar[FunctionStorage]`\n\n## Files to Change\n\n- `vgi/function_storage.py`: New protocol + merged FunctionStorageSqlite\n- `vgi/function.py`: Single `storage` class variable, update all method calls","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T12:34:25.966005-05:00","created_by":"rusty","updated_at":"2026-01-04T12:58:16.913278-05:00","closed_at":"2026-01-04T12:58:16.913278-05:00","close_reason":"Closed"}

vgi/examples/table.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -473,11 +473,11 @@ def cardinality(self) -> TableCardinality:
473473
"""
474474
return TableCardinality(estimate=self.count, max=self.count)
475475

476-
def perform_init(self, init_input: pa.RecordBatch) -> InitResult:
476+
def initialize_global_state(self, init_input: pa.RecordBatch) -> InitResult:
477477
"""Populate the work queue with range chunks."""
478-
# Parse init data and store in init_storage
479-
self.init_data = TableFunctionInitInput.deserialize(init_input)
480-
self.init_identifier = self.init_storage.create(self.init_data.serialize())
478+
# Parse init data and store in storage
479+
self.init_input = TableFunctionInitInput.deserialize(init_input)
480+
self.execution_identifier = self.storage.global_put(self.init_input.serialize())
481481

482482
# Create work items for each chunk of the range
483483
work_items: list[bytes] = []
@@ -489,7 +489,7 @@ def perform_init(self, init_input: pa.RecordBatch) -> InitResult:
489489
if work_items:
490490
self.enqueue_work(work_items)
491491

492-
return InitResult(self.init_identifier)
492+
return InitResult(self.execution_identifier)
493493

494494
def process(self) -> OutputGenerator:
495495
"""Generate values by pulling chunks from the work queue."""
@@ -579,7 +579,7 @@ class Meta:
579579

580580
@property
581581
def output_schema(self) -> pa.Schema:
582-
"""Return the projected schema based on init_data."""
582+
"""Return the projected schema based on init_input."""
583583
return self.apply_projection(self.FULL_SCHEMA)
584584

585585
def cardinality(self) -> TableCardinality:
@@ -591,8 +591,8 @@ def _get_projected_column_indices(self) -> list[int]:
591591
592592
Returns indices from projection_ids if set, otherwise all columns.
593593
"""
594-
if self.init_data and self.init_data.projection_ids is not None:
595-
return self.init_data.projection_ids
594+
if self.init_input and self.init_input.projection_ids is not None:
595+
return self.init_input.projection_ids
596596
return list(range(len(self.FULL_SCHEMA)))
597597

598598
def process(self) -> OutputGenerator:

vgi/function.py

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242

4343
import vgi.ipc_utils
4444
from vgi.exceptions import ExecutionIdentifierError, SchemaValidationError
45-
from vgi.function_storage import SqliteInitStorage, SqliteWorkerStateStorage
45+
from vgi.function_storage import FunctionStorage, FunctionStorageSqlite
4646
from vgi.invocation import InitResult, Invocation
4747
from vgi.metadata import MetadataMixin, ResolvedMetadata
4848

@@ -262,10 +262,7 @@ class Meta:
262262
263263
"""
264264

265-
global_state_storage: ClassVar[SqliteInitStorage] = SqliteInitStorage()
266-
worker_state_storage: ClassVar[SqliteWorkerStateStorage] = (
267-
SqliteWorkerStateStorage()
268-
)
265+
storage: ClassVar[FunctionStorage] = FunctionStorageSqlite()
269266

270267
# Cache for resolved metadata
271268
_metadata_cache: ClassVar[ResolvedMetadata | None] = None
@@ -370,7 +367,7 @@ def process(self, batch: pa.RecordBatch) -> OutputGenerator:
370367
"Cannot store state: execution_identifier is not set. "
371368
"Call initialize_global_state() or load_global_state() first."
372369
)
373-
self.worker_state_storage.store(
370+
self.storage.worker_put(
374371
self.execution_identifier,
375372
os.getpid(),
376373
state.serialize(),
@@ -406,9 +403,7 @@ def finalize(self) -> OutputGenerator:
406403
"Cannot collect states: execution_identifier is not set. "
407404
"Call initialize_global_state() or load_global_state() first."
408405
)
409-
state_bytes_list = self.worker_state_storage.collect_and_delete(
410-
self.execution_identifier
411-
)
406+
state_bytes_list = self.storage.worker_collect(self.execution_identifier)
412407
return [state_class.deserialize(data) for data in state_bytes_list]
413408

414409
def enqueue_work(self, work_items: list[bytes]) -> int:
@@ -442,9 +437,7 @@ def initialize_global_state(self, init_input: pa.RecordBatch) -> InitResult:
442437
"Cannot enqueue work: execution_identifier is not set. "
443438
"Call enqueue_work() after initialize_global_state() has completed."
444439
)
445-
return self.worker_state_storage.enqueue_work(
446-
self.execution_identifier, work_items
447-
)
440+
return self.storage.queue_push(self.execution_identifier, work_items)
448441

449442
def dequeue_work(self) -> bytes | None:
450443
"""Claim and return the next work item from the queue.
@@ -477,7 +470,7 @@ def process(self) -> OutputGenerator:
477470
"Cannot dequeue work: execution_identifier is not set. "
478471
"Call initialize_global_state() or load_global_state() first."
479472
)
480-
return self.worker_state_storage.dequeue_work(self.execution_identifier)
473+
return self.storage.queue_pop(self.execution_identifier)
481474

482475
@final
483476
@cached_property
@@ -534,9 +527,7 @@ def initialize_global_state(self, init_input: pa.RecordBatch) -> InitResult:
534527
"""Perform a new init call and store it in the storage."""
535528
self.init_input = self.InitInputType.deserialize(init_input)
536529
assert self.init_input is not None
537-
self.execution_identifier = self.global_state_storage.create(
538-
self.init_input.serialize()
539-
)
530+
self.execution_identifier = self.storage.global_put(self.init_input.serialize())
540531
return InitResult(self.execution_identifier)
541532

542533
def load_global_state(self, init_input: InitResult) -> None:
@@ -548,7 +539,7 @@ def load_global_state(self, init_input: InitResult) -> None:
548539
)
549540
self.execution_identifier = init_input.global_execution_identifier
550541
self.init_input = self.InitInputType.deserialize_bytes(
551-
self.global_state_storage.get(self.execution_identifier)
542+
self.storage.global_get(self.execution_identifier)
552543
)
553544

554545
def setup(self) -> None:

0 commit comments

Comments
 (0)