Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbzero/dbzero/dbzero.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def load_dynamic(name, path):

def __bootstrap__():
global __bootstrap__, __loader__, __file__
paths = [os.path.join(os.path.split(__file__)[0]), "/src/dev/build/release", "/usr/local/lib/python3/dist-packages/dbzero/"]
paths = [os.path.join(os.path.split(__file__)[0]), "/src/dev/build/debug", "/usr/local/lib/python3/dist-packages/dbzero/"]
__file__ = None
for path in paths:
if os.path.isdir(path):
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ build-backend = 'mesonpy'
requires = ['meson-python']

[project]
name = 'DBzero'
name = 'dbzero'
version = '0.1.0'
description = 'DBZero Community edition'
description = 'DBZero Community Edition'
readme = 'README.md'
requires-python = '>=3.8'
license = {file = 'LICENSE'}
Expand Down
214 changes: 155 additions & 59 deletions python_tests/test_copy_prefix.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ def test_copy_prefix_custom_step_size(db0_fixture):
def writer_process(prefix, obj_count = 50, commit_count = 50, long_run = False):
db0.init(DB0_DIR)
db0.open(prefix, "rw")
# create new or open an existing root object
root = MemoTestSingleton([])
if (len(root.value) > 0):
print(f"Writer process: opened existing prefix with {len(root.value)} objects")
for i in range(commit_count):
for _ in range(obj_count):
root.value.append(MemoTestClass("b" * 1024)) # 1 KB string
Expand All @@ -100,7 +103,7 @@ def writer_process(prefix, obj_count = 50, commit_count = 50, long_run = False):
print(f"Writer process: committed {i * obj_count} objects")
else:
time.sleep(0.1)

if long_run:
print(db0.get_storage_stats())
db0.commit()
Expand Down Expand Up @@ -216,72 +219,165 @@ def validate_copy(copy_id, expected_len = None, expected_min_len = None):
return result

db0.close()
obj_count = 5000
commit_count = 100
# start the writer process for a long run
p = multiprocessing.Process(target=writer_process, args=(px_name, obj_count, commit_count, True))
p.start()

db0.init(DB0_DIR)
db0.open(px_name, "r")
last_len = 0
while True:
try:
root = db0.fetch(MemoTestSingleton)
if len(root.value) > 1:
last_len = len(root.value)
# in each 'epoch' we modify prefix while making copies
# then drop the original prefix and restore if from the last copy
epoch_count = 2
for epoch in range(epoch_count):
print(f"=== Epoch {epoch} ===")
# obj_count = 5000
# commit_count = 100
obj_count = 500
commit_count = 100
# start the writer process for a long run
p = multiprocessing.Process(target=writer_process, args=(px_name, obj_count, commit_count, True))
p.start()

db0.init(DB0_DIR)
db0.open(px_name, "r")
last_len = 0
while True:
try:
root = db0.fetch(MemoTestSingleton)
if len(root.value) > 1:
last_len = len(root.value)
break
except Exception:
pass
time.sleep(0.1)

copy_id = 0
# copy the prefix multiple times while it is being modified
while True:
if not p.is_alive():
break
except Exception:
pass
time.sleep(0.1)
file_name = f"./test-copy-{copy_id}.db0"
if os.path.exists(file_name):
os.remove(file_name)
# copy prefix without opening it, use default step size
print("--- Copying prefix iteration", copy_id)
db0.copy_prefix(file_name, prefix=px_name)
print("--- copy finished")
copy_id += 1
if not p.is_alive():
break
time.sleep(2.5) # wait a bit before next copy

p.join()

# validate original prefix (no copy yet)
print("Validating final prefix ...")
db0.open(px_name, "r")
validate_current_prefix(expected_len = obj_count * commit_count)

# make final stale copy (i.e. without active modifications)
final_copy = f"./test-copy-final.db0"
if os.path.exists(final_copy):
os.remove(final_copy)
db0.copy_prefix(final_copy, prefix=px_name)
db0.close()

print("Validating all copies")
validate_copy("final", expected_len = obj_count * commit_count)
for i in range(copy_id):
last_len = validate_copy(i, expected_min_len = last_len)
print(f"--- Copy {i} valid with {last_len} objects")

# now, continue modifications starting from the last restored copy (making new copies)


def test_modify_copied_prefix(db0_fixture):
file_name = "./test-copy.db0"
# remove file if it exists
if os.path.exists(file_name):
os.remove(file_name)

px_name = db0.get_current_prefix().name
px_path = os.path.join(DB0_DIR, px_name + ".db0")
root = MemoTestSingleton([])
total_len = 0

copy_id = 0
# copy the prefix multiple times while it is being modified
while True:
if not p.is_alive():
break
file_name = f"./test-copy-{copy_id}.db0"
if os.path.exists(file_name):
os.remove(file_name)
# copy prefix without opening it, use default step size
print("--- Copying prefix iteration", copy_id)
db0.copy_prefix(file_name, prefix=px_name)
print("--- copy finished")
copy_id += 1
if not p.is_alive():
break
time.sleep(2.5) # wait a bit before next copy
def modify_prefix():
append_count = 0
root = db0.fetch(MemoTestSingleton)
for _ in range(50):
root.value.append(MemoTestClass("a" * 1024)) # 1 KB string
append_count += 1
db0.commit()
return append_count

p.join()
total_len += modify_prefix()
db0.copy_prefix(file_name)
db0.close()

# validate original prefix (no copy yet)
print("Validating final prefix ...")
db0.open(px_name, "r")
validate_current_prefix(expected_len = obj_count * commit_count)
# drop original file and replace with copy
os.remove(px_path)
os.rename(file_name, px_path)

# make final stale copy (i.e. without active modifications)
final_copy = f"./test-copy-final.db0"
if os.path.exists(final_copy):
os.remove(final_copy)
db0.copy_prefix(final_copy, prefix=px_name)
# open recovered prefix for update
db0.init(DB0_DIR, prefix=px_name, read_write=True)
total_len += modify_prefix()
db0.close()

# open prefix from recovered and modified copy
db0.init(DB0_DIR, prefix=px_name, read_write=False)
root = db0.fetch(MemoTestSingleton)
for item in root.value:
assert item.value == "a" * 1024
assert len(root.value) == total_len


def test_copy_prefix_of_recovered_copy(db0_fixture):
file_name = "./test-copy.db0"
# remove file if it exists
if os.path.exists(file_name):
os.remove(file_name)

px_name = db0.get_current_prefix().name
px_path = os.path.join(DB0_DIR, px_name + ".db0")
root = MemoTestSingleton([])
total_len = 0
charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"

print("Validating all copies")
validate_copy("final", expected_len = obj_count * commit_count)
for i in range(copy_id):
last_len = validate_copy(i, expected_min_len = last_len)
print(f"--- Copy {i} valid with {last_len} objects")
def modify_prefix(op_count = 50):
append_count = 0
root = db0.fetch(MemoTestSingleton)
for _ in range(op_count):
c = charset[len(root.value) % len(charset)]
root.value.append(MemoTestClass(c * 1024)) # 1 KB string
append_count += 1
db0.commit()
return append_count

def test_copy_prefix_throws_on_path_passed(db0_fixture):
path = "./invalid-dir/nonexistent/-copy/"
# remove path if it exists
if os.path.exists(path):
os.rmdir(path)
def validate(expected_len):
root = db0.fetch(MemoTestSingleton)
for i, item in enumerate(root.value):
c = charset[i % len(charset)]
assert item.value == c * 1024
assert len(root.value) == expected_len

total_len += modify_prefix(150)
db0.copy_prefix(file_name, page_io_step_size=64 << 10)
db0.close()

# drop original file and replace with copy
os.remove(px_path)
os.rename(file_name, px_path)

root = MemoTestSingleton([])
for _ in range(50):
root.value.append(MemoTestClass("a" * 1024)) # 1 KB string
db0.commit()
# open recovered prefix for update
db0.init(DB0_DIR, prefix=px_name, read_write=True)
total_len += modify_prefix(100)

db0.close()
db0.init(DB0_DIR, prefix=px_name, read_write=True)
validate(total_len)
db0.copy_prefix(file_name)
db0.close()

with pytest.raises(OSError) as excinfo:
db0.copy_prefix(path)
# restore copy of a restored and modified copy
os.remove(px_path)
os.rename(file_name, px_path)

# open prefix from recovered and modified copy of a copy
db0.init(DB0_DIR, prefix=px_name, read_write=False)
validate(total_len)
9 changes: 5 additions & 4 deletions src/dbzero/bindings/python/PyInternalAPI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -946,13 +946,14 @@ namespace db0::python
PyErr_Format(PyExc_OSError, "Output file already exists: '%s'", output_file_name);
return nullptr;
}

// use either explicit step size, input step size (if > 1) or default = 4MB
if (!page_io_step_size) {
auto in_step_size = src_storage.getPageIO().getStepSize();
page_io_step_size = in_step_size > 1 ? in_step_size : (4u << 20);
auto &page_io = src_storage.getPageIO();
auto in_step_size = page_io.getStepSize();
page_io_step_size = in_step_size > 1 ? (in_step_size * page_io.getBlockSize()) : (4u << 20);
}

if (!meta_io_step_size) {
auto in_meta_step_size = src_storage.getMetaIO().getStepSize();
meta_io_step_size = in_meta_step_size > 1 ? in_meta_step_size : (1u << 20);
Expand Down
38 changes: 36 additions & 2 deletions src/dbzero/core/collections/SGB_Tree/SGB_CompressedLookupTree.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ DB0_PACKED_END
using CompT = typename super_t::CompT;
using NodeItemCompT = typename super_t::NodeItemCompT;
using NodeItemEqualT = typename super_t::NodeItemEqualT;
using const_iterator = typename super_t::const_iterator;

// as null / invalid
SGB_CompressedLookupTree() = default;
Expand Down Expand Up @@ -274,7 +275,11 @@ DB0_PACKED_END
sg_tree_const_iterator cend_nodes() const {
return base_t::end();
}


bool empty() const {
return super_t::empty();
}

std::size_t size() const {
return super_t::size();
}
Expand Down Expand Up @@ -337,7 +342,7 @@ DB0_PACKED_END

return std::nullopt;
}

// Locate first element which is greater or equal to the key
template <typename KeyT> std::optional<ItemT> upper_equal_bound(const KeyT &key) const
{
Expand Down Expand Up @@ -423,6 +428,35 @@ DB0_PACKED_END
bool operator!() const {
return super_t::operator!();
}

class uncompressed_const_iterator: protected super_t::const_iterator
{
public:
uncompressed_const_iterator(const const_iterator &iterator)
: super_t::const_iterator(iterator)
{
}

bool is_end() const {
return super_t::const_iterator::is_end();
}

uncompressed_const_iterator &operator++()
{
super_t::const_iterator::operator++();
return *this;
}

ItemT operator*() const {
// return uncompressed item from the underlying iterator
return this->m_tree_it->header().uncompress(super_t::const_iterator::operator*());
}
};

// Begin sorted iteration over all items (uncompressed)
uncompressed_const_iterator cbegin() const {
return super_t::cbegin();
}

private:
ItemCompT m_raw_item_comp;
Expand Down
Loading
Loading