Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions python_tests/test_issues_16.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# SPDX-License-Identifier: LGPL-2.1-or-later
# Copyright (c) 2025 DBZero Software sp. z o.o.

import os
import subprocess
import sys
import textwrap


def test_repeated_memo_graph_creation_with_tagged_shared_lookup_does_not_segfault(tmp_path):
"""Regression for the statek Job construction segfault."""
script = textwrap.dedent(
f"""
from dataclasses import dataclass
from decimal import Decimal
from typing import Optional
import os

import dbzero as db0

db_path = {str(tmp_path / "db0")!r}
os.mkdir(db_path)
db0.init(db_path)
db0.open("p")

JobStatus = db0.enum("Issue16JobStatus", ["READY", "DONE"])

@db0.memo
@dataclass
class Pricing:
input_price_per_M: Optional[Decimal] = None
input_price_per_cached_M: Optional[Decimal] = None
output_price_per_M: Optional[Decimal] = None

@db0.memo(no_default_tags=True)
@dataclass
class Usage:
pricing: Pricing
context_bytes: int = 0
total_bytes_sent: int = 0
total_bytes_received: int = 0
total_input_tokens: int = 0
total_cached_tokens: int = 0
total_output_tokens: int = 0
total_reported_cost: Optional[float] = None

@db0.memo
class RuntimeState:
def __init__(self):
self.local_state = {{}}
self.console = []
self.exceptions = None
self.exit_status = None

@db0.memo
class Agent:
def __init__(self, role, metadata, tools):
self.role = role
self._metadata = metadata
self._tools = tools

@db0.memo
@dataclass
class JobDef:
agent: Agent
metadata: dict = None
job_params: dict = None
warmup_code: object = None

def __post_init__(self):
if self.agent is not None:
db0.tags(self).add(self.agent)
if self.metadata is None:
self.metadata = self.agent._metadata

@db0.memo
class Job:
def __init__(self, job_def, job_status=JobStatus.READY):
self.job_def = job_def
self.parent_job = None
if self.job_def.agent is not None:
db0.tags(self).add(self.job_def.agent)
self.__job_status = None
self.set_status(job_status)
self.runtime_state = RuntimeState()
self.chat_log = []
self.awaited_result = None
self.next_instr_num = None
self.warmup_block_num = None
self.error = None
self.created_at = None
self.error_handlers = []
self.__last_difficulty = None
self.usage = Usage(pricing=self.current_pricing())
self.error = None
self.num_completions = None
self.__ext_ref = None
self.__pending_chat_log = []

def set_status(self, new_status):
if self.__job_status is not None:
db0.tags(self).remove(self.__job_status)
db0.tags(self).add(new_status)
self.__job_status = new_status

def current_pricing(self):
existing = next(iter(db0.find(Pricing, "UNKNOWN", "test-model")), None)
if existing is not None:
return existing
pricing = Pricing()
db0.tags(pricing).add(["UNKNOWN", "test-model", "USAGE"])
return pricing

def make_job():
agent = Agent("test", {{"MODEL": "test-model"}}, [])
job_def = JobDef(agent)
return Job(job_def)

make_job()
make_job()
db0.close()
"""
)

env = os.environ.copy()
env["PYTHONDONTWRITEBYTECODE"] = "1"
result = subprocess.run(
[sys.executable, "-c", script],
check=False,
env=env,
text=True,
capture_output=True,
)

assert result.returncode == 0, (
f"subprocess exited with {result.returncode}\\n"
f"stdout:\\n{result.stdout}\\n"
f"stderr:\\n{result.stderr}"
)
103 changes: 103 additions & 0 deletions python_tests/test_issues_17.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# SPDX-License-Identifier: LGPL-2.1-or-later
# Copyright (c) 2025 DBZero Software sp. z o.o.

import os
import subprocess
import sys
import textwrap


def test_statek_future_error_warmup_keeps_job_status_readable(tmp_path):
"""Focused repro for the statek FutureError warmup failure."""
script = textwrap.dedent(
f"""
import asyncio
import os
from dataclasses import dataclass

import dbzero as db0

from statek.agents.agent import Agent
from statek.exceptions import FutureError
from statek.executors.job import Job, JobDef, JobStatus
from statek.executors.utils import run_job_step
from statek.future import FutureResult
from statek.prompt_config import make_system_prompt

db_path = {str(tmp_path / "db0")!r}
os.mkdir(db_path)
db0.init(db_path, read_write=True)
db0.open("test_prefix", "rw")

@db0.memo
@dataclass
class MemoObject:
value: int = 0

def check_condition_false(_):
return False

def fetch_result_not_ready(future_result):
raise FutureError(future_result=future_result)

future = FutureResult(deps=MemoObject(value=0), state_num=0)
future.set_complement_functions(
complement=fetch_result_not_ready,
condition=check_condition_false,
)

agent = Agent(
role="test",
_system_prompt=make_system_prompt("Test"),
_metadata={{"MODEL": "test-model"}},
_tools=[],
)
job_def = JobDef(
agent=agent,
metadata={{"MODEL": "test-model"}},
warmup_code=[
"counter = counter + 1\\n"
"before_flag = True\\n"
"result = future_val\\n"
"after_flag = True",
'exit("ok")',
],
)
job = Job(
job_def=job_def,
model_family="test",
model="test-model",
job_status=JobStatus.READY,
)
job.py_env.local_state["counter"] = 0
job.py_env.local_state["before_flag"] = False
job.py_env.local_state["after_flag"] = False
job.py_env.local_state["future_val"] = future

result = asyncio.run(run_job_step(job))
assert result is False
assert job.status == JobStatus.WARMING_UP
assert job.awaited_result is future
assert job.next_instr_num == 2

db0.close()
"""
)

env = os.environ.copy()
env["PYTHONDONTWRITEBYTECODE"] = "1"
env["PYTHONPATH"] = "/src/statek" + os.pathsep + env.get("PYTHONPATH", "")

result = subprocess.run(
[sys.executable, "-c", script],
check=False,
env=env,
text=True,
capture_output=True,
)

assert result.returncode == 0, (
f"subprocess exited with {result.returncode}\n"
f"stdout:\n{result.stdout}\n"
f"stderr:\n{result.stderr}"
)
8 changes: 4 additions & 4 deletions src/dbzero/core/collections/vector/v_sorted_vector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -871,8 +871,8 @@ DB0_PACKED_END
*/
bool updateExisting(const data_t &data, data_t *old_data = nullptr)
{
auto it = (*this)->find(data);
if (it == (*this)->end()) {
auto it = find(data);
if (it == end()) {
return false;
}

Expand All @@ -887,8 +887,8 @@ DB0_PACKED_END

bool findOne(data_t &data) const
{
auto it = (*this)->find(data);
if (it == (*this)->end()) {
auto it = find(data);
if (it == end()) {
return false;
}

Expand Down
62 changes: 21 additions & 41 deletions src/dbzero/object_model/object/Object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,18 @@ namespace db0::object_model

return m_kv_index.get();
}

void Object::syncKVIndexRef(KV_Index *kv_index_ptr)
{
assert(kv_index_ptr);
auto address = kv_index_ptr->getAddress();
auto type = kv_index_ptr->getIndexType();
if ((*this)->m_kv_address != address || (*this)->m_kv_type != type) {
auto &object = this->modify();
object.m_kv_address = address;
object.m_kv_type = type;
}
}

void Object::addToKVIndex(FixtureLock &fixture, FieldID field_id, unsigned int fidelity,
StorageClass storage_class, Value value)
Expand All @@ -309,18 +321,10 @@ namespace db0::object_model
lofi_store<2>::fromValue(kv_value).set(field_id.getOffset(), value.m_store);
xvalue.m_value = kv_value;
kv_index_ptr->updateExisting(xvalue);
// in case of the IttyIndex updating an element changes the address/type
// which needs to be updated in the object
if (kv_index_ptr->getIndexType() == bindex::type::itty) {
this->modify().m_kv_address = kv_index_ptr->getAddress();
this->modify().m_kv_type = kv_index_ptr->getIndexType();
}
syncKVIndexRef(kv_index_ptr);
} else {
if (kv_index_ptr->insert(xvalue)) {
// type or address of the kv-index has changed which needs to be reflected
this->modify().m_kv_address = kv_index_ptr->getAddress();
this->modify().m_kv_type = kv_index_ptr->getIndexType();
}
kv_index_ptr->insert(xvalue);
syncKVIndexRef(kv_index_ptr);
}
}

Expand All @@ -345,21 +349,10 @@ namespace db0::object_model
// mark as deleted in kv-index
xvalue.m_type = StorageClass::DELETED;
kv_index_ptr->updateExisting(xvalue);
// in case of the IttyIndex updating an element changes the address/type
// which needs to be updated in the object
if (kv_index_ptr->getIndexType() == bindex::type::itty) {
this->modify().m_kv_address = kv_index_ptr->getAddress();
this->modify().m_kv_type = kv_index_ptr->getIndexType();
}
syncKVIndexRef(kv_index_ptr);
} else {
auto old_addr = kv_index_ptr->getAddress();
kv_index_ptr->erase(xvalue);
auto new_addr = kv_index_ptr->getAddress();
if (new_addr != old_addr) {
// type or address of the kv-index has changed which needs to be reflected
this->modify().m_kv_address = new_addr;
this->modify().m_kv_type = kv_index_ptr->getIndexType();
}
syncKVIndexRef(kv_index_ptr);
}
m_type->removeFromSchema(field_id, fidelity, getSchemaTypeId(xvalue.m_type));
} else {
Expand All @@ -378,12 +371,7 @@ namespace db0::object_model
}
xvalue.m_value = value;
kv_index_ptr->updateExisting(xvalue);
// in case of the IttyIndex updating an element changes the address/type
// which needs to be updated in the object
if (kv_index_ptr->getIndexType() == bindex::type::itty) {
this->modify().m_kv_address = kv_index_ptr->getAddress();
this->modify().m_kv_type = kv_index_ptr->getIndexType();
}
syncKVIndexRef(kv_index_ptr);

m_type->removeFromSchema(field_id, fidelity, old_type_id);
}
Expand Down Expand Up @@ -447,18 +435,10 @@ namespace db0::object_model
auto new_type_id = getSchemaTypeId(storage_class, value);
m_type->updateSchema(field_id, fidelity, old_type_id, new_type_id);
}
// in case of the IttyIndex updating an element changes the address/type
// which needs to be updated in the object
if (kv_index_ptr->getIndexType() == bindex::type::itty) {
this->modify().m_kv_address = kv_index_ptr->getAddress();
this->modify().m_kv_type = kv_index_ptr->getIndexType();
}
syncKVIndexRef(kv_index_ptr);
} else {
if (kv_index_ptr->insert(xvalue)) {
// type or address of the kv-index has changed which needs to be reflected
this->modify().m_kv_address = kv_index_ptr->getAddress();
this->modify().m_kv_type = kv_index_ptr->getIndexType();
}
kv_index_ptr->insert(xvalue);
syncKVIndexRef(kv_index_ptr);

m_type->addToSchema(field_id, fidelity, getSchemaTypeId(storage_class, value));
}
Expand Down
1 change: 1 addition & 0 deletions src/dbzero/object_model/object/Object.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ namespace db0::object_model
KV_Index *addKV_First(const XValue &);

KV_Index *tryGetKV_Index() const;
void syncKVIndexRef(KV_Index *);

bool hasKV_Index() const;

Expand Down
Loading