From 099b27b63b7fb47b5754176fd4390ddee43b6129 Mon Sep 17 00:00:00 2001 From: Wojtek Date: Fri, 29 May 2026 14:55:18 +0200 Subject: [PATCH 1/3] failing test case + bugfix --- src/dbzero/core/collections/vector/v_sorted_vector.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dbzero/core/collections/vector/v_sorted_vector.hpp b/src/dbzero/core/collections/vector/v_sorted_vector.hpp index 9515d0ea..2ed7b3cf 100644 --- a/src/dbzero/core/collections/vector/v_sorted_vector.hpp +++ b/src/dbzero/core/collections/vector/v_sorted_vector.hpp @@ -872,7 +872,7 @@ DB0_PACKED_END bool updateExisting(const data_t &data, data_t *old_data = nullptr) { auto it = (*this)->find(data); - if (it == (*this)->end()) { + if (!it) { return false; } @@ -888,7 +888,7 @@ DB0_PACKED_END bool findOne(data_t &data) const { auto it = (*this)->find(data); - if (it == (*this)->end()) { + if (!it) { return false; } From fa352236388b386320fa2c66abe455c376c732c0 Mon Sep 17 00:00:00 2001 From: Wojtek Date: Fri, 29 May 2026 15:44:13 +0200 Subject: [PATCH 2/3] bugfixes / proper sync kv-index ref --- python_tests/test_issues_16.py | 139 ++++++++++++++++++ python_tests/test_issues_17.py | 103 +++++++++++++ .../collections/vector/v_sorted_vector.hpp | 8 +- src/dbzero/object_model/object/Object.cpp | 62 +++----- src/dbzero/object_model/object/Object.hpp | 1 + 5 files changed, 268 insertions(+), 45 deletions(-) create mode 100644 python_tests/test_issues_16.py create mode 100644 python_tests/test_issues_17.py diff --git a/python_tests/test_issues_16.py b/python_tests/test_issues_16.py new file mode 100644 index 00000000..bd277b28 --- /dev/null +++ b/python_tests/test_issues_16.py @@ -0,0 +1,139 @@ +# SPDX-License-Identifier: LGPL-2.1-or-later +# Copyright (c) 2025 DBZero Software sp. z o.o. + +import os +import subprocess +import sys +import textwrap + + +def test_repeated_memo_graph_creation_with_tagged_shared_lookup_does_not_segfault(tmp_path): + """Regression for the statek Job construction segfault.""" + script = textwrap.dedent( + f""" + from dataclasses import dataclass + from decimal import Decimal + from typing import Optional + import os + + import dbzero as db0 + + db_path = {str(tmp_path / "db0")!r} + os.mkdir(db_path) + db0.init(db_path) + db0.open("p") + + JobStatus = db0.enum("Issue16JobStatus", ["READY", "DONE"]) + + @db0.memo + @dataclass + class Pricing: + input_price_per_M: Optional[Decimal] = None + input_price_per_cached_M: Optional[Decimal] = None + output_price_per_M: Optional[Decimal] = None + + @db0.memo(no_default_tags=True) + @dataclass + class Usage: + pricing: Pricing + context_bytes: int = 0 + total_bytes_sent: int = 0 + total_bytes_received: int = 0 + total_input_tokens: int = 0 + total_cached_tokens: int = 0 + total_output_tokens: int = 0 + total_reported_cost: Optional[float] = None + + @db0.memo + class RuntimeState: + def __init__(self): + self.local_state = {{}} + self.console = [] + self.exceptions = None + self.exit_status = None + + @db0.memo + class Agent: + def __init__(self, role, metadata, tools): + self.role = role + self._metadata = metadata + self._tools = tools + + @db0.memo + @dataclass + class JobDef: + agent: Agent + metadata: dict = None + job_params: dict = None + warmup_code: object = None + + def __post_init__(self): + if self.agent is not None: + db0.tags(self).add(self.agent) + if self.metadata is None: + self.metadata = self.agent._metadata + + @db0.memo + class Job: + def __init__(self, job_def, job_status=JobStatus.READY): + self.job_def = job_def + self.parent_job = None + if self.job_def.agent is not None: + db0.tags(self).add(self.job_def.agent) + self.__job_status = None + self.set_status(job_status) + self.runtime_state = RuntimeState() + self.chat_log = [] + self.awaited_result = None + self.next_instr_num = None + self.warmup_block_num = None + self.error = None + self.created_at = None + self.error_handlers = [] + self.__last_difficulty = None + self.usage = Usage(pricing=self.current_pricing()) + self.error = None + self.num_completions = None + self.__ext_ref = None + self.__pending_chat_log = [] + + def set_status(self, new_status): + if self.__job_status is not None: + db0.tags(self).remove(self.__job_status) + db0.tags(self).add(new_status) + self.__job_status = new_status + + def current_pricing(self): + existing = next(iter(db0.find(Pricing, "UNKNOWN", "test-model")), None) + if existing is not None: + return existing + pricing = Pricing() + db0.tags(pricing).add(["UNKNOWN", "test-model", "USAGE"]) + return pricing + + def make_job(): + agent = Agent("test", {{"MODEL": "test-model"}}, []) + job_def = JobDef(agent) + return Job(job_def) + + make_job() + make_job() + db0.close() + """ + ) + + env = os.environ.copy() + env["PYTHONDONTWRITEBYTECODE"] = "1" + result = subprocess.run( + [sys.executable, "-c", script], + check=False, + env=env, + text=True, + capture_output=True, + ) + + assert result.returncode == 0, ( + f"subprocess exited with {result.returncode}\\n" + f"stdout:\\n{result.stdout}\\n" + f"stderr:\\n{result.stderr}" + ) diff --git a/python_tests/test_issues_17.py b/python_tests/test_issues_17.py new file mode 100644 index 00000000..f63d0015 --- /dev/null +++ b/python_tests/test_issues_17.py @@ -0,0 +1,103 @@ +# SPDX-License-Identifier: LGPL-2.1-or-later +# Copyright (c) 2025 DBZero Software sp. z o.o. + +import os +import subprocess +import sys +import textwrap + + +def test_statek_future_error_warmup_keeps_job_status_readable(tmp_path): + """Focused repro for the statek FutureError warmup failure.""" + script = textwrap.dedent( + f""" + import asyncio + import os + from dataclasses import dataclass + + import dbzero as db0 + + from statek.agents.agent import Agent + from statek.exceptions import FutureError + from statek.executors.job import Job, JobDef, JobStatus + from statek.executors.utils import run_job_step + from statek.future import FutureResult + from statek.prompt_config import make_system_prompt + + db_path = {str(tmp_path / "db0")!r} + os.mkdir(db_path) + db0.init(db_path, read_write=True) + db0.open("test_prefix", "rw") + + @db0.memo + @dataclass + class MemoObject: + value: int = 0 + + def check_condition_false(_): + return False + + def fetch_result_not_ready(future_result): + raise FutureError(future_result=future_result) + + future = FutureResult(deps=MemoObject(value=0), state_num=0) + future.set_complement_functions( + complement=fetch_result_not_ready, + condition=check_condition_false, + ) + + agent = Agent( + role="test", + _system_prompt=make_system_prompt("Test"), + _metadata={{"MODEL": "test-model"}}, + _tools=[], + ) + job_def = JobDef( + agent=agent, + metadata={{"MODEL": "test-model"}}, + warmup_code=[ + "counter = counter + 1\\n" + "before_flag = True\\n" + "result = future_val\\n" + "after_flag = True", + 'exit("ok")', + ], + ) + job = Job( + job_def=job_def, + model_family="test", + model="test-model", + job_status=JobStatus.READY, + ) + job.py_env.local_state["counter"] = 0 + job.py_env.local_state["before_flag"] = False + job.py_env.local_state["after_flag"] = False + job.py_env.local_state["future_val"] = future + + result = asyncio.run(run_job_step(job)) + assert result is False + assert job.status == JobStatus.WARMING_UP + assert job.awaited_result is future + assert job.next_instr_num == 2 + + db0.close() + """ + ) + + env = os.environ.copy() + env["PYTHONDONTWRITEBYTECODE"] = "1" + env["PYTHONPATH"] = "/src/statek" + os.pathsep + env.get("PYTHONPATH", "") + + result = subprocess.run( + [sys.executable, "-c", script], + check=False, + env=env, + text=True, + capture_output=True, + ) + + assert result.returncode == 0, ( + f"subprocess exited with {result.returncode}\n" + f"stdout:\n{result.stdout}\n" + f"stderr:\n{result.stderr}" + ) diff --git a/src/dbzero/core/collections/vector/v_sorted_vector.hpp b/src/dbzero/core/collections/vector/v_sorted_vector.hpp index 2ed7b3cf..f6ea765b 100644 --- a/src/dbzero/core/collections/vector/v_sorted_vector.hpp +++ b/src/dbzero/core/collections/vector/v_sorted_vector.hpp @@ -871,8 +871,8 @@ DB0_PACKED_END */ bool updateExisting(const data_t &data, data_t *old_data = nullptr) { - auto it = (*this)->find(data); - if (!it) { + auto it = find(data); + if (it == end()) { return false; } @@ -887,8 +887,8 @@ DB0_PACKED_END bool findOne(data_t &data) const { - auto it = (*this)->find(data); - if (!it) { + auto it = find(data); + if (it == end()) { return false; } diff --git a/src/dbzero/object_model/object/Object.cpp b/src/dbzero/object_model/object/Object.cpp index 37ec57fd..9be4cf34 100644 --- a/src/dbzero/object_model/object/Object.cpp +++ b/src/dbzero/object_model/object/Object.cpp @@ -290,6 +290,18 @@ namespace db0::object_model return m_kv_index.get(); } + + void Object::syncKVIndexRef(KV_Index *kv_index_ptr) + { + assert(kv_index_ptr); + auto address = kv_index_ptr->getAddress(); + auto type = kv_index_ptr->getIndexType(); + if ((*this)->m_kv_address != address || (*this)->m_kv_type != type) { + auto &object = this->modify(); + object.m_kv_address = address; + object.m_kv_type = type; + } + } void Object::addToKVIndex(FixtureLock &fixture, FieldID field_id, unsigned int fidelity, StorageClass storage_class, Value value) @@ -309,18 +321,10 @@ namespace db0::object_model lofi_store<2>::fromValue(kv_value).set(field_id.getOffset(), value.m_store); xvalue.m_value = kv_value; kv_index_ptr->updateExisting(xvalue); - // in case of the IttyIndex updating an element changes the address/type - // which needs to be updated in the object - if (kv_index_ptr->getIndexType() == bindex::type::itty) { - this->modify().m_kv_address = kv_index_ptr->getAddress(); - this->modify().m_kv_type = kv_index_ptr->getIndexType(); - } + syncKVIndexRef(kv_index_ptr); } else { - if (kv_index_ptr->insert(xvalue)) { - // type or address of the kv-index has changed which needs to be reflected - this->modify().m_kv_address = kv_index_ptr->getAddress(); - this->modify().m_kv_type = kv_index_ptr->getIndexType(); - } + kv_index_ptr->insert(xvalue); + syncKVIndexRef(kv_index_ptr); } } @@ -345,21 +349,10 @@ namespace db0::object_model // mark as deleted in kv-index xvalue.m_type = StorageClass::DELETED; kv_index_ptr->updateExisting(xvalue); - // in case of the IttyIndex updating an element changes the address/type - // which needs to be updated in the object - if (kv_index_ptr->getIndexType() == bindex::type::itty) { - this->modify().m_kv_address = kv_index_ptr->getAddress(); - this->modify().m_kv_type = kv_index_ptr->getIndexType(); - } + syncKVIndexRef(kv_index_ptr); } else { - auto old_addr = kv_index_ptr->getAddress(); kv_index_ptr->erase(xvalue); - auto new_addr = kv_index_ptr->getAddress(); - if (new_addr != old_addr) { - // type or address of the kv-index has changed which needs to be reflected - this->modify().m_kv_address = new_addr; - this->modify().m_kv_type = kv_index_ptr->getIndexType(); - } + syncKVIndexRef(kv_index_ptr); } m_type->removeFromSchema(field_id, fidelity, getSchemaTypeId(xvalue.m_type)); } else { @@ -378,12 +371,7 @@ namespace db0::object_model } xvalue.m_value = value; kv_index_ptr->updateExisting(xvalue); - // in case of the IttyIndex updating an element changes the address/type - // which needs to be updated in the object - if (kv_index_ptr->getIndexType() == bindex::type::itty) { - this->modify().m_kv_address = kv_index_ptr->getAddress(); - this->modify().m_kv_type = kv_index_ptr->getIndexType(); - } + syncKVIndexRef(kv_index_ptr); m_type->removeFromSchema(field_id, fidelity, old_type_id); } @@ -447,18 +435,10 @@ namespace db0::object_model auto new_type_id = getSchemaTypeId(storage_class, value); m_type->updateSchema(field_id, fidelity, old_type_id, new_type_id); } - // in case of the IttyIndex updating an element changes the address/type - // which needs to be updated in the object - if (kv_index_ptr->getIndexType() == bindex::type::itty) { - this->modify().m_kv_address = kv_index_ptr->getAddress(); - this->modify().m_kv_type = kv_index_ptr->getIndexType(); - } + syncKVIndexRef(kv_index_ptr); } else { - if (kv_index_ptr->insert(xvalue)) { - // type or address of the kv-index has changed which needs to be reflected - this->modify().m_kv_address = kv_index_ptr->getAddress(); - this->modify().m_kv_type = kv_index_ptr->getIndexType(); - } + kv_index_ptr->insert(xvalue); + syncKVIndexRef(kv_index_ptr); m_type->addToSchema(field_id, fidelity, getSchemaTypeId(storage_class, value)); } diff --git a/src/dbzero/object_model/object/Object.hpp b/src/dbzero/object_model/object/Object.hpp index d3968b73..0e3c98e3 100644 --- a/src/dbzero/object_model/object/Object.hpp +++ b/src/dbzero/object_model/object/Object.hpp @@ -88,6 +88,7 @@ namespace db0::object_model KV_Index *addKV_First(const XValue &); KV_Index *tryGetKV_Index() const; + void syncKVIndexRef(KV_Index *); bool hasKV_Index() const; From 3242d06067ba3f581c95c2d92a5b948ac6306a23 Mon Sep 17 00:00:00 2001 From: Wojtek Date: Fri, 29 May 2026 17:20:01 +0200 Subject: [PATCH 3/3] test fix --- python_tests/test_issues_16.py | 2 +- python_tests/test_issues_17.py | 115 +++++++-------------------------- 2 files changed, 25 insertions(+), 92 deletions(-) diff --git a/python_tests/test_issues_16.py b/python_tests/test_issues_16.py index bd277b28..35a5d5d2 100644 --- a/python_tests/test_issues_16.py +++ b/python_tests/test_issues_16.py @@ -8,7 +8,7 @@ def test_repeated_memo_graph_creation_with_tagged_shared_lookup_does_not_segfault(tmp_path): - """Regression for the statek Job construction segfault.""" + """Regression for repeated memo graph construction with tagged shared lookup.""" script = textwrap.dedent( f""" from dataclasses import dataclass diff --git a/python_tests/test_issues_17.py b/python_tests/test_issues_17.py index f63d0015..0567adb6 100644 --- a/python_tests/test_issues_17.py +++ b/python_tests/test_issues_17.py @@ -1,103 +1,36 @@ # SPDX-License-Identifier: LGPL-2.1-or-later # Copyright (c) 2025 DBZero Software sp. z o.o. -import os -import subprocess -import sys -import textwrap +from dataclasses import dataclass +from itertools import product +from typing import Optional +import dbzero as db0 -def test_statek_future_error_warmup_keeps_job_status_readable(tmp_path): - """Focused repro for the statek FutureError warmup failure.""" - script = textwrap.dedent( - f""" - import asyncio - import os - from dataclasses import dataclass - import dbzero as db0 +@db0.memo(immutable=True, intern=True) +@dataclass +class Issue17PackedMask: + create: Optional[bool] = None + read: Optional[bool] = None + update: Optional[bool] = None + delete: Optional[bool] = None - from statek.agents.agent import Agent - from statek.exceptions import FutureError - from statek.executors.job import Job, JobDef, JobStatus - from statek.executors.utils import run_job_step - from statek.future import FutureResult - from statek.prompt_config import make_system_prompt - db_path = {str(tmp_path / "db0")!r} - os.mkdir(db_path) - db0.init(db_path, read_write=True) - db0.open("test_prefix", "rw") +def test_interned_immutable_object_with_only_pack_2_fields_materializes(db0_fixture): + field_names = ("create", "read", "update", "delete") + values = (None, False, True) + seen_uuids = set() - @db0.memo - @dataclass - class MemoObject: - value: int = 0 + for combination in product(values, repeat=len(field_names)): + kwargs = dict(zip(field_names, combination)) - def check_condition_false(_): - return False + materialized = db0.materialized(Issue17PackedMask(**kwargs)) + duplicate = db0.materialized(Issue17PackedMask(**kwargs)) - def fetch_result_not_ready(future_result): - raise FutureError(future_result=future_result) + assert tuple(getattr(materialized, name) for name in field_names) == combination + assert tuple(getattr(duplicate, name) for name in field_names) == combination + assert db0.uuid(duplicate) == db0.uuid(materialized) + seen_uuids.add(db0.uuid(materialized)) - future = FutureResult(deps=MemoObject(value=0), state_num=0) - future.set_complement_functions( - complement=fetch_result_not_ready, - condition=check_condition_false, - ) - - agent = Agent( - role="test", - _system_prompt=make_system_prompt("Test"), - _metadata={{"MODEL": "test-model"}}, - _tools=[], - ) - job_def = JobDef( - agent=agent, - metadata={{"MODEL": "test-model"}}, - warmup_code=[ - "counter = counter + 1\\n" - "before_flag = True\\n" - "result = future_val\\n" - "after_flag = True", - 'exit("ok")', - ], - ) - job = Job( - job_def=job_def, - model_family="test", - model="test-model", - job_status=JobStatus.READY, - ) - job.py_env.local_state["counter"] = 0 - job.py_env.local_state["before_flag"] = False - job.py_env.local_state["after_flag"] = False - job.py_env.local_state["future_val"] = future - - result = asyncio.run(run_job_step(job)) - assert result is False - assert job.status == JobStatus.WARMING_UP - assert job.awaited_result is future - assert job.next_instr_num == 2 - - db0.close() - """ - ) - - env = os.environ.copy() - env["PYTHONDONTWRITEBYTECODE"] = "1" - env["PYTHONPATH"] = "/src/statek" + os.pathsep + env.get("PYTHONPATH", "") - - result = subprocess.run( - [sys.executable, "-c", script], - check=False, - env=env, - text=True, - capture_output=True, - ) - - assert result.returncode == 0, ( - f"subprocess exited with {result.returncode}\n" - f"stdout:\n{result.stdout}\n" - f"stderr:\n{result.stderr}" - ) + assert len(seen_uuids) == len(values) ** len(field_names)