diff --git a/python_tests/test_issues_16.py b/python_tests/test_issues_16.py new file mode 100644 index 00000000..bd277b28 --- /dev/null +++ b/python_tests/test_issues_16.py @@ -0,0 +1,139 @@ +# SPDX-License-Identifier: LGPL-2.1-or-later +# Copyright (c) 2025 DBZero Software sp. z o.o. + +import os +import subprocess +import sys +import textwrap + + +def test_repeated_memo_graph_creation_with_tagged_shared_lookup_does_not_segfault(tmp_path): + """Regression for the statek Job construction segfault.""" + script = textwrap.dedent( + f""" + from dataclasses import dataclass + from decimal import Decimal + from typing import Optional + import os + + import dbzero as db0 + + db_path = {str(tmp_path / "db0")!r} + os.mkdir(db_path) + db0.init(db_path) + db0.open("p") + + JobStatus = db0.enum("Issue16JobStatus", ["READY", "DONE"]) + + @db0.memo + @dataclass + class Pricing: + input_price_per_M: Optional[Decimal] = None + input_price_per_cached_M: Optional[Decimal] = None + output_price_per_M: Optional[Decimal] = None + + @db0.memo(no_default_tags=True) + @dataclass + class Usage: + pricing: Pricing + context_bytes: int = 0 + total_bytes_sent: int = 0 + total_bytes_received: int = 0 + total_input_tokens: int = 0 + total_cached_tokens: int = 0 + total_output_tokens: int = 0 + total_reported_cost: Optional[float] = None + + @db0.memo + class RuntimeState: + def __init__(self): + self.local_state = {{}} + self.console = [] + self.exceptions = None + self.exit_status = None + + @db0.memo + class Agent: + def __init__(self, role, metadata, tools): + self.role = role + self._metadata = metadata + self._tools = tools + + @db0.memo + @dataclass + class JobDef: + agent: Agent + metadata: dict = None + job_params: dict = None + warmup_code: object = None + + def __post_init__(self): + if self.agent is not None: + db0.tags(self).add(self.agent) + if self.metadata is None: + self.metadata = self.agent._metadata + + @db0.memo + class Job: + def __init__(self, job_def, job_status=JobStatus.READY): + self.job_def = job_def + self.parent_job = None + if self.job_def.agent is not None: + db0.tags(self).add(self.job_def.agent) + self.__job_status = None + self.set_status(job_status) + self.runtime_state = RuntimeState() + self.chat_log = [] + self.awaited_result = None + self.next_instr_num = None + self.warmup_block_num = None + self.error = None + self.created_at = None + self.error_handlers = [] + self.__last_difficulty = None + self.usage = Usage(pricing=self.current_pricing()) + self.error = None + self.num_completions = None + self.__ext_ref = None + self.__pending_chat_log = [] + + def set_status(self, new_status): + if self.__job_status is not None: + db0.tags(self).remove(self.__job_status) + db0.tags(self).add(new_status) + self.__job_status = new_status + + def current_pricing(self): + existing = next(iter(db0.find(Pricing, "UNKNOWN", "test-model")), None) + if existing is not None: + return existing + pricing = Pricing() + db0.tags(pricing).add(["UNKNOWN", "test-model", "USAGE"]) + return pricing + + def make_job(): + agent = Agent("test", {{"MODEL": "test-model"}}, []) + job_def = JobDef(agent) + return Job(job_def) + + make_job() + make_job() + db0.close() + """ + ) + + env = os.environ.copy() + env["PYTHONDONTWRITEBYTECODE"] = "1" + result = subprocess.run( + [sys.executable, "-c", script], + check=False, + env=env, + text=True, + capture_output=True, + ) + + assert result.returncode == 0, ( + f"subprocess exited with {result.returncode}\\n" + f"stdout:\\n{result.stdout}\\n" + f"stderr:\\n{result.stderr}" + ) diff --git a/python_tests/test_issues_17.py b/python_tests/test_issues_17.py new file mode 100644 index 00000000..f63d0015 --- /dev/null +++ b/python_tests/test_issues_17.py @@ -0,0 +1,103 @@ +# SPDX-License-Identifier: LGPL-2.1-or-later +# Copyright (c) 2025 DBZero Software sp. z o.o. + +import os +import subprocess +import sys +import textwrap + + +def test_statek_future_error_warmup_keeps_job_status_readable(tmp_path): + """Focused repro for the statek FutureError warmup failure.""" + script = textwrap.dedent( + f""" + import asyncio + import os + from dataclasses import dataclass + + import dbzero as db0 + + from statek.agents.agent import Agent + from statek.exceptions import FutureError + from statek.executors.job import Job, JobDef, JobStatus + from statek.executors.utils import run_job_step + from statek.future import FutureResult + from statek.prompt_config import make_system_prompt + + db_path = {str(tmp_path / "db0")!r} + os.mkdir(db_path) + db0.init(db_path, read_write=True) + db0.open("test_prefix", "rw") + + @db0.memo + @dataclass + class MemoObject: + value: int = 0 + + def check_condition_false(_): + return False + + def fetch_result_not_ready(future_result): + raise FutureError(future_result=future_result) + + future = FutureResult(deps=MemoObject(value=0), state_num=0) + future.set_complement_functions( + complement=fetch_result_not_ready, + condition=check_condition_false, + ) + + agent = Agent( + role="test", + _system_prompt=make_system_prompt("Test"), + _metadata={{"MODEL": "test-model"}}, + _tools=[], + ) + job_def = JobDef( + agent=agent, + metadata={{"MODEL": "test-model"}}, + warmup_code=[ + "counter = counter + 1\\n" + "before_flag = True\\n" + "result = future_val\\n" + "after_flag = True", + 'exit("ok")', + ], + ) + job = Job( + job_def=job_def, + model_family="test", + model="test-model", + job_status=JobStatus.READY, + ) + job.py_env.local_state["counter"] = 0 + job.py_env.local_state["before_flag"] = False + job.py_env.local_state["after_flag"] = False + job.py_env.local_state["future_val"] = future + + result = asyncio.run(run_job_step(job)) + assert result is False + assert job.status == JobStatus.WARMING_UP + assert job.awaited_result is future + assert job.next_instr_num == 2 + + db0.close() + """ + ) + + env = os.environ.copy() + env["PYTHONDONTWRITEBYTECODE"] = "1" + env["PYTHONPATH"] = "/src/statek" + os.pathsep + env.get("PYTHONPATH", "") + + result = subprocess.run( + [sys.executable, "-c", script], + check=False, + env=env, + text=True, + capture_output=True, + ) + + assert result.returncode == 0, ( + f"subprocess exited with {result.returncode}\n" + f"stdout:\n{result.stdout}\n" + f"stderr:\n{result.stderr}" + ) diff --git a/src/dbzero/core/collections/vector/v_sorted_vector.hpp b/src/dbzero/core/collections/vector/v_sorted_vector.hpp index 9515d0ea..f6ea765b 100644 --- a/src/dbzero/core/collections/vector/v_sorted_vector.hpp +++ b/src/dbzero/core/collections/vector/v_sorted_vector.hpp @@ -871,8 +871,8 @@ DB0_PACKED_END */ bool updateExisting(const data_t &data, data_t *old_data = nullptr) { - auto it = (*this)->find(data); - if (it == (*this)->end()) { + auto it = find(data); + if (it == end()) { return false; } @@ -887,8 +887,8 @@ DB0_PACKED_END bool findOne(data_t &data) const { - auto it = (*this)->find(data); - if (it == (*this)->end()) { + auto it = find(data); + if (it == end()) { return false; } diff --git a/src/dbzero/object_model/object/Object.cpp b/src/dbzero/object_model/object/Object.cpp index 37ec57fd..9be4cf34 100644 --- a/src/dbzero/object_model/object/Object.cpp +++ b/src/dbzero/object_model/object/Object.cpp @@ -290,6 +290,18 @@ namespace db0::object_model return m_kv_index.get(); } + + void Object::syncKVIndexRef(KV_Index *kv_index_ptr) + { + assert(kv_index_ptr); + auto address = kv_index_ptr->getAddress(); + auto type = kv_index_ptr->getIndexType(); + if ((*this)->m_kv_address != address || (*this)->m_kv_type != type) { + auto &object = this->modify(); + object.m_kv_address = address; + object.m_kv_type = type; + } + } void Object::addToKVIndex(FixtureLock &fixture, FieldID field_id, unsigned int fidelity, StorageClass storage_class, Value value) @@ -309,18 +321,10 @@ namespace db0::object_model lofi_store<2>::fromValue(kv_value).set(field_id.getOffset(), value.m_store); xvalue.m_value = kv_value; kv_index_ptr->updateExisting(xvalue); - // in case of the IttyIndex updating an element changes the address/type - // which needs to be updated in the object - if (kv_index_ptr->getIndexType() == bindex::type::itty) { - this->modify().m_kv_address = kv_index_ptr->getAddress(); - this->modify().m_kv_type = kv_index_ptr->getIndexType(); - } + syncKVIndexRef(kv_index_ptr); } else { - if (kv_index_ptr->insert(xvalue)) { - // type or address of the kv-index has changed which needs to be reflected - this->modify().m_kv_address = kv_index_ptr->getAddress(); - this->modify().m_kv_type = kv_index_ptr->getIndexType(); - } + kv_index_ptr->insert(xvalue); + syncKVIndexRef(kv_index_ptr); } } @@ -345,21 +349,10 @@ namespace db0::object_model // mark as deleted in kv-index xvalue.m_type = StorageClass::DELETED; kv_index_ptr->updateExisting(xvalue); - // in case of the IttyIndex updating an element changes the address/type - // which needs to be updated in the object - if (kv_index_ptr->getIndexType() == bindex::type::itty) { - this->modify().m_kv_address = kv_index_ptr->getAddress(); - this->modify().m_kv_type = kv_index_ptr->getIndexType(); - } + syncKVIndexRef(kv_index_ptr); } else { - auto old_addr = kv_index_ptr->getAddress(); kv_index_ptr->erase(xvalue); - auto new_addr = kv_index_ptr->getAddress(); - if (new_addr != old_addr) { - // type or address of the kv-index has changed which needs to be reflected - this->modify().m_kv_address = new_addr; - this->modify().m_kv_type = kv_index_ptr->getIndexType(); - } + syncKVIndexRef(kv_index_ptr); } m_type->removeFromSchema(field_id, fidelity, getSchemaTypeId(xvalue.m_type)); } else { @@ -378,12 +371,7 @@ namespace db0::object_model } xvalue.m_value = value; kv_index_ptr->updateExisting(xvalue); - // in case of the IttyIndex updating an element changes the address/type - // which needs to be updated in the object - if (kv_index_ptr->getIndexType() == bindex::type::itty) { - this->modify().m_kv_address = kv_index_ptr->getAddress(); - this->modify().m_kv_type = kv_index_ptr->getIndexType(); - } + syncKVIndexRef(kv_index_ptr); m_type->removeFromSchema(field_id, fidelity, old_type_id); } @@ -447,18 +435,10 @@ namespace db0::object_model auto new_type_id = getSchemaTypeId(storage_class, value); m_type->updateSchema(field_id, fidelity, old_type_id, new_type_id); } - // in case of the IttyIndex updating an element changes the address/type - // which needs to be updated in the object - if (kv_index_ptr->getIndexType() == bindex::type::itty) { - this->modify().m_kv_address = kv_index_ptr->getAddress(); - this->modify().m_kv_type = kv_index_ptr->getIndexType(); - } + syncKVIndexRef(kv_index_ptr); } else { - if (kv_index_ptr->insert(xvalue)) { - // type or address of the kv-index has changed which needs to be reflected - this->modify().m_kv_address = kv_index_ptr->getAddress(); - this->modify().m_kv_type = kv_index_ptr->getIndexType(); - } + kv_index_ptr->insert(xvalue); + syncKVIndexRef(kv_index_ptr); m_type->addToSchema(field_id, fidelity, getSchemaTypeId(storage_class, value)); } diff --git a/src/dbzero/object_model/object/Object.hpp b/src/dbzero/object_model/object/Object.hpp index d3968b73..0e3c98e3 100644 --- a/src/dbzero/object_model/object/Object.hpp +++ b/src/dbzero/object_model/object/Object.hpp @@ -88,6 +88,7 @@ namespace db0::object_model KV_Index *addKV_First(const XValue &); KV_Index *tryGetKV_Index() const; + void syncKVIndexRef(KV_Index *); bool hasKV_Index() const;