Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dbzero/dbzero/initialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ def init(dbzero_root: str, **kwargs) -> None:
"""

init_kwargs = {}

config_keys = ("autocommit", "autocommit_interval", "cache_size", "lang_cache_size", "suppress_dist_overflow_error")
config_keys = ("autocommit", "autocommit_interval", "cache_size", "lang_cache_size")
config = {}
for key in config_keys:
if key in kwargs:
Expand Down
8 changes: 6 additions & 2 deletions python_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@ def db0_fixture(request):
db0.init(
DB0_DIR
)
db0.open("my-test-prefix")
yield db0
db0.open(
"my-test-prefix",
# use custom page_io_step_size if specified in request.param
page_io_step_size=__extract_param(request, "page_io_step_size", None)
)
yield db0
gc.collect()
db0.close()
if os.path.exists(DB0_DIR):
Expand Down
70 changes: 69 additions & 1 deletion python_tests/test_issues_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import dbzero as db0
import random
import string
from .memo_test_types import MemoTestClass, DynamicDataSingleton, MemoScopedSingleton
import time
import multiprocessing
from .memo_test_types import MemoTestClass, MemoTestSingleton
from .conftest import DB0_DIR


Expand Down Expand Up @@ -138,4 +140,70 @@ def test_db0_commit_close_issue_1(db0_fixture):

db0.init(DB0_DIR)
db0.open(prefix.name, "rw")


def make_small_update(px_name, expected_values):
time.sleep(0.25)
db0.init(DB0_DIR)
db0.open(px_name, "rw")
note = MemoTestClass(expected_values[0])
db0.tags(note).add("tag")
db0.commit()
time.sleep(0.25)
if 'D' in db0.build_flags():
db0.dbg_start_logs()
note.value = expected_values[1]
db0.close()


@pytest.mark.parametrize("db0_slab_size", [{"slab_size": 1 << 20}], indirect=True)
def test_refresh_issue1(db0_slab_size):
"""
Issue: process blocked on refresh attempt
Reason: missing SparsePair.commit() call when finishing a transaction
"""
px_name = db0.get_current_prefix().name
expected_values = ["first string", "second string"]

rand_ints = [350, 480, 343, 475, 871, 493, 550, 723, 342, 236, 110, 585, 633, 54, 797, 478, 850, 716, 1021,
136, 248, 879, 151, 249, 15, 717, 773, 625, 738, 731, 955, 280, 208, 730, 754, 982, 281, 221,
549, 501, 282, 307, 551, 472, 509, 761, 78, 735, 744, 450, 388, 645, 577, 706, 417, 78, 849,
873, 904, 534, 945, 985, 431, 725, 826, 49, 64, 766, 32, 460, 971, 766, 390, 990, 899, 835,
16, 570, 190, 573, 54, 642, 840, 817, 924, 793, 634, 889, 835, 250, 676, 1006, 819, 322,
373, 278, 895, 767, 380, 442]

index = 0
root = MemoTestSingleton([])
for _ in range(10000):
str_len = rand_ints[index]
root.value.append(''.join("A" for i in range(str_len)))
index += 1
if index == len(rand_ints):
index = 0
db0.close()
time.sleep(1)
p = multiprocessing.Process(target=make_small_update,
args=(px_name, expected_values))
p.start()

db0.init(DB0_DIR)
db0.open(px_name, "r")

for i in range(2):
state_num = db0.get_state_num(px_name)
# refresh until 2 transactions are detected
max_repeat = 30
if i == 1 and 'D' in db0.build_flags():
db0.dbg_start_logs()

while db0.get_state_num(px_name) == state_num:
assert max_repeat > 0
db0.refresh()
time.sleep(0.1)
max_repeat -= 1
assert next(iter(db0.find(MemoTestClass))).value == expected_values[i]
max_repeat -= 1

p.join()


47 changes: 47 additions & 0 deletions python_tests/test_page_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import dbzero as db0
from .memo_test_types import MemoTestClass, MemoTestSingleton
from .conftest import DB0_DIR


def test_create_prefix_with_page_io_step_size(db0_fixture):
# use 16 MB page I/O step size
db0.open("some-new-prefix", "rw", page_io_step_size = 16 << 20)
buf = []
for _ in range(50):
buf.append(MemoTestClass("a" * 1024)) # 1 KB string
# commit after each append
db0.commit()

px_size_1 = db0.get_storage_stats()["prefix_size"]
assert px_size_1 > (16 << 20)

# after adding more pages, prefix size should not increase until next step is reached
for _ in range(50):
buf.append(MemoTestClass("a" * 1024)) # 1 KB string
# commit after each append
db0.commit()

px_size_2 = db0.get_storage_stats()["prefix_size"]
assert (px_size_2 - px_size_1) < (128 << 10)


def test_continue_append_with_step_size(db0_fixture):
db0.open("some-new-prefix", "rw", page_io_step_size = 16 << 20)
root = MemoTestSingleton([])
for _ in range(50):
root.value.append(MemoTestClass("a" * 1024)) # 1 KB string
db0.commit()

print("--- before close ---")
db0.close()
db0.init(DB0_DIR)
# NOTE: we're opening an existing prefix with already initialized page I/O step size
db0.open("some-new-prefix", "rw")
root = db0.fetch(MemoTestSingleton)
for _ in range(250):
root.value.append(MemoTestClass("a" * 1024))
db0.commit()

# NOTE: this behavior will change after we implement REL_Index
assert db0.get_storage_stats()["prefix_size"] > (32 << 20)

75 changes: 9 additions & 66 deletions python_tests/test_refresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@
from .conftest import DB0_DIR
from .memo_test_types import DynamicDataClass, DynamicDataSingleton, MemoTestClass, MemoTestSingleton

# NOTE: all tests in this module are run twice
# to verify that refresh works correctly with custom page_io_step_size

pytestmark = pytest.mark.parametrize("db0_fixture", [
{}, # default parameters
{"page_io_step_size": 16 << 10} # with custom page_io_step_size
], indirect=True)


@db0.memo(singleton=True)
class MemoClassX:
Expand Down Expand Up @@ -471,68 +479,6 @@ def make_trasaction(n):
p.terminate()
p.join()

def make_small_update(px_name, expected_values):
time.sleep(0.25)
db0.init(DB0_DIR)
db0.open(px_name, "rw")
note = MemoTestClass(expected_values[0])
db0.tags(note).add("tag")
db0.commit()
time.sleep(0.25)
if 'D' in db0.build_flags():
db0.dbg_start_logs()
note.value = expected_values[1]
db0.close()

@pytest.mark.parametrize("db0_slab_size", [{"slab_size": 1 * 1024 * 1024}], indirect=True)
def test_refresh_issue1(db0_slab_size):
"""
Issue: process blocked on refresh attempt
Reason: missing SparsePair.commit() call when finishing a transaction
"""
px_name = db0.get_current_prefix().name
expected_values = ["first string", "second string"]

rand_ints = [350, 480, 343, 475, 871, 493, 550, 723, 342, 236, 110, 585, 633, 54, 797, 478, 850, 716, 1021,
136, 248, 879, 151, 249, 15, 717, 773, 625, 738, 731, 955, 280, 208, 730, 754, 982, 281, 221,
549, 501, 282, 307, 551, 472, 509, 761, 78, 735, 744, 450, 388, 645, 577, 706, 417, 78, 849,
873, 904, 534, 945, 985, 431, 725, 826, 49, 64, 766, 32, 460, 971, 766, 390, 990, 899, 835,
16, 570, 190, 573, 54, 642, 840, 817, 924, 793, 634, 889, 835, 250, 676, 1006, 819, 322,
373, 278, 895, 767, 380, 442]

index = 0
root = MemoTestSingleton([])
for _ in range(10000):
str_len = rand_ints[index]
root.value.append(''.join("A" for i in range(str_len)))
index += 1
if index == len(rand_ints):
index = 0
db0.close()
time.sleep(1)
p = multiprocessing.Process(target=make_small_update,
args=(px_name, expected_values))
p.start()

db0.init(DB0_DIR)
db0.open(px_name, "r")

for i in range(2):
state_num = db0.get_state_num(px_name)
# refresh until 2 transactions are detected
max_repeat = 30
if i == 1 and 'D' in db0.build_flags():
db0.dbg_start_logs()

while db0.get_state_num(px_name) == state_num:
assert max_repeat > 0
db0.refresh()
time.sleep(0.1)
max_repeat -= 1
assert next(iter(db0.find(MemoTestClass))).value == expected_values[i]
max_repeat -= 1

p.join()

def writer_process(prefix, writer_sem, reader_sem):
db0.init(DB0_DIR)
Expand All @@ -559,8 +505,6 @@ async def test_async_wait_for_updates(db0_fixture):
db0.commit()
db0.close()



writer_sem = multiprocessing.Semaphore(0)
reader_sem = multiprocessing.Semaphore(0)

Expand All @@ -570,8 +514,7 @@ async def test_async_wait_for_updates(db0_fixture):

db0.init(DB0_DIR)
db0.open(prefix, "r")



# Start waiting before transactions complete
current_num = db0.get_state_num(prefix)
make_trasaction(writer_sem, 5)
Expand Down
37 changes: 24 additions & 13 deletions src/dbzero/bindings/python/PyAPI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,16 +182,18 @@ namespace db0::python
{
// prefix_name, open_mode, autocommit (bool)
static const char *kwlist[] = {
"prefix_name", "open_mode", "autocommit", "slab_size", "lock_flags", "meta_io_step_size", NULL
"prefix_name", "open_mode", "autocommit", "slab_size", "lock_flags", "meta_io_step_size",
"page_io_step_size", NULL
};
const char *prefix_name = nullptr;
const char *open_mode = nullptr;
PyObject *py_autocommit = nullptr;
PyObject *py_slab_size = nullptr;
PyObject *py_lock_flags = nullptr;
PyObject *py_meta_io_step_size = nullptr;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|sOOOO:open", const_cast<char**>(kwlist),
&prefix_name, &open_mode, &py_autocommit, &py_slab_size, &py_lock_flags, &py_meta_io_step_size))
PyObject *py_page_io_step_size = nullptr;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|sOOOOO:open", const_cast<char**>(kwlist),
&prefix_name, &open_mode, &py_autocommit, &py_slab_size, &py_lock_flags, &py_meta_io_step_size, &py_page_io_step_size))
{
return NULL;
}
Expand All @@ -200,14 +202,13 @@ namespace db0::python
if (py_autocommit && PyLong_Check(py_autocommit)) {
autocommit_interval = PyLong_AsLong(py_autocommit);
}

std::optional<std::size_t> slab_size;
if (py_slab_size && !PyLong_Check(py_slab_size)) {
PyErr_SetString(PyExc_TypeError, "Invalid argument type: slab_size");
return NULL;
}

if (py_slab_size) {
if (py_slab_size && py_slab_size != Py_None) {
if (!PyLong_Check(py_slab_size)) {
PyErr_SetString(PyExc_TypeError, "Invalid argument type: slab_size");
return NULL;
}
slab_size = PyLong_AsUnsignedLong(py_slab_size);
}

Expand All @@ -225,21 +226,31 @@ namespace db0::python
}

std::optional<std::size_t> meta_io_step_size;
if (py_meta_io_step_size) {
std::optional<std::size_t> page_io_step_size;
if (py_meta_io_step_size && py_meta_io_step_size != Py_None) {
if (!PyLong_Check(py_meta_io_step_size)) {
PyErr_SetString(PyExc_TypeError, "Invalid argument type: meta_io_step_size");
return NULL;
}
meta_io_step_size = PyLong_AsUnsignedLong(py_meta_io_step_size);
}

// check for None (default)
if (py_page_io_step_size && py_page_io_step_size != Py_None) {
if (!PyLong_Check(py_page_io_step_size)) {
PyErr_SetString(PyExc_TypeError, "Invalid argument type: page_io_step_size");
return NULL;
}
page_io_step_size = PyLong_AsUnsignedLong(py_page_io_step_size);
}

auto access_type = open_mode ? parseAccessType(open_mode) : db0::AccessType::READ_WRITE;
PyToolkit::getPyWorkspace().open(
prefix_name, access_type, autocommit, slab_size, py_lock_flags, meta_io_step_size
prefix_name, access_type, autocommit, slab_size, py_lock_flags, meta_io_step_size, page_io_step_size
);
Py_RETURN_NONE;
}

PyObject *PyAPI_open(PyObject *self, PyObject *args, PyObject *kwargs)
{
PY_API_FUNC
Expand Down
9 changes: 5 additions & 4 deletions src/dbzero/bindings/python/PyWorkspace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,22 @@ namespace db0::python
}

void PyWorkspace::open(const std::string &prefix_name, AccessType access_type, std::optional<bool> autocommit,
std::optional<std::size_t> slab_size, ObjectPtr py_lock_flags, std::optional<std::size_t> meta_io_step_size)
std::optional<std::size_t> slab_size, ObjectPtr py_lock_flags, std::optional<std::size_t> meta_io_step_size,
std::optional<std::size_t> page_io_step_size)
{
if (!m_workspace) {
// initialize dbzero with current working directory
initWorkspace("");
}

if (py_lock_flags) {
db0::Config lock_flags_config(py_lock_flags);
m_workspace->open(prefix_name, access_type, autocommit, slab_size,
lock_flags_config, meta_io_step_size
lock_flags_config, meta_io_step_size, page_io_step_size
);
} else {
m_workspace->open(prefix_name, access_type, autocommit, slab_size,
{}, meta_io_step_size
{}, meta_io_step_size, page_io_step_size
);
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/dbzero/bindings/python/PyWorkspace.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ namespace db0::python
* Opens a specific prefix for read or read/write
* a newly opened read/write prefix becomes the default one
* @param slab_size will only have effect for a newly created prefixes
* @param page_io_step_size parameter only respected for newly created prefixes
*/
void open(const std::string &prefix_name, AccessType, std::optional<bool> autocommit = {},
std::optional<std::size_t> slab_size = {}, ObjectPtr lock_flags = nullptr,
std::optional<std::size_t> meta_io_step_size = {}
std::optional<std::size_t> meta_io_step_size = {}, std::optional<std::size_t> page_io_step_size = {}
);

db0::Workspace &getWorkspace() const;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@ DB0_PACKED_END
std::size_t size() const {
return super_t::size();
}

void commit() {
void commit() const {
super_t::commit();
}

Expand Down
Loading