Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbzero/dbzero/dbzero.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def load_dynamic(name, path):

def __bootstrap__():
global __bootstrap__, __loader__, __file__
paths = [os.path.join(os.path.split(__file__)[0]), "/src/dev/build/debug", "/usr/local/lib/python3/dist-packages/dbzero/"]
paths = [os.path.join(os.path.split(__file__)[0]), "/src/dev/build/release", "/usr/local/lib/python3/dist-packages/dbzero/"]
__file__ = None
for path in paths:
if os.path.isdir(path):
Expand Down
18 changes: 9 additions & 9 deletions python_tests/test_copy_prefix.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def validate_copy(copy_id, expected_len = None, expected_min_len = None):
# restore the copy
os.rename(file_name, px_path)

print(f"--- Validating copy {copy_id}")
print(f"--- Validating copy {copy_id}", flush=True)
db0.init(DB0_DIR, prefix=px_name, read_write=False)
result = validate_current_prefix(expected_len, expected_min_len)
db0.close()
Expand All @@ -224,7 +224,7 @@ def validate_copy(copy_id, expected_len = None, expected_min_len = None):
epoch_count = 2
total_len = 0
for epoch in range(epoch_count):
print(f"=== Epoch {epoch} ===")
print(f"=== Epoch {epoch} ===", flush=True)
obj_count = 5000
commit_count = 100
# start the writer process for a long run
Expand All @@ -246,16 +246,16 @@ def validate_copy(copy_id, expected_len = None, expected_min_len = None):

copy_id = 0
# copy the prefix multiple times while it is being modified
while True:
while True:
if not p.is_alive():
break
file_name = f"./test-copy-{copy_id}.db0"
if os.path.exists(file_name):
os.remove(file_name)
# copy prefix without opening it, use default step size
print("--- Copying prefix iteration", copy_id)
print("--- Copying prefix iteration", copy_id, flush=True)
db0.copy_prefix(file_name, prefix=px_name)
print("--- copy finished")
print("--- copy finished", flush=True)
copy_id += 1
if not p.is_alive():
break
Expand All @@ -265,8 +265,8 @@ def validate_copy(copy_id, expected_len = None, expected_min_len = None):
total_len += obj_count * commit_count

# validate original prefix (no copy yet)
# print("Validating final prefix ...", flush=True)
# validate_current_prefix(expected_len = total_len)
print("Validating final prefix ...", flush=True)
validate_current_prefix(expected_len = total_len)

# make final stale copy (i.e. without active modifications)
final_copy = f"./test-copy-final.db0"
Expand All @@ -275,11 +275,11 @@ def validate_copy(copy_id, expected_len = None, expected_min_len = None):
db0.copy_prefix(final_copy, prefix=px_name)
db0.close()

print("Validating all copies")
print("Validating all copies", flush=True)
validate_copy("final", expected_len = total_len)
for i in range(copy_id):
last_len = validate_copy(i, expected_min_len = last_len)
print(f"--- Copy {i} valid with {last_len} objects")
print(f"--- Copy {i} valid with {last_len} objects", flush=True)
# this is the restored version
total_len = last_len

Expand Down
141 changes: 141 additions & 0 deletions python_tests/test_refresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import time
import asyncio
import dbzero as db0
import os
from .conftest import DB0_DIR
from .memo_test_types import DynamicDataClass, DynamicDataSingleton, MemoTestClass, MemoTestSingleton

Expand Down Expand Up @@ -552,3 +553,143 @@ async def test_async_wait_for_updates(db0_fixture):

p.terminate()
p.join()


def append_to_prefix(prefix, obj_count = 50, commit_count = 50, long_run = False):
db0.init(DB0_DIR)
db0.open(prefix, "rw")
# create new or open an existing root object
root = MemoTestSingleton([])
if (len(root.value) > 0):
print(f"Writer process: opened existing prefix with {len(root.value)} objects")
for i in range(commit_count):
for _ in range(obj_count):
root.value.append(MemoTestClass("b" * 1024)) # 1 KB string
db0.commit()
if long_run:
print(f"Writer process: committed {(i + 1) * obj_count} objects", flush=True)
else:
time.sleep(0.1)

if long_run:
print(db0.get_storage_stats(), flush=True)
db0.close()


@pytest.mark.stress_test
def test_refresh_prefix_continuous_process_with_snapshot(db0_fixture):
px_name = db0.get_current_prefix().name

def validate_current_prefix(expected_len = None, expected_min_len = None):
snap = db0.snapshot()
root = snap.fetch(MemoTestSingleton)
assert not expected_min_len or len(root.value) >= expected_min_len
assert not expected_len or len(root.value) == expected_len
for item in root.value:
assert item.value == "b" * 1024
return len(root.value)

db0.close()

# in each 'epoch' we modify prefix while making copies
# then drop the original prefix and restore if from the last copy
epoch_count = 2
total_len = 0
for epoch in range(epoch_count):
print(f"=== Epoch {epoch} ===")
obj_count = 5000
commit_count = 100
# start the writer process for a long run
p = multiprocessing.Process(target=append_to_prefix, args=(px_name, obj_count, commit_count, True))
p.start()

db0.init(DB0_DIR)
db0.open(px_name, "r")
last_len = 0
while True:
try:
root = db0.fetch(MemoTestSingleton)
if len(root.value) > 1:
last_len = len(root.value)
break
except Exception:
pass
time.sleep(0.1)

# validate prefix while writer is actively modifying it
while True:
if not p.is_alive():
break
print("--- Validate prefix iteration", flush=True)
last_len = validate_current_prefix(expected_min_len = last_len)
print(f"--- Prefix valid with {last_len} objects", flush=True)
if not p.is_alive():
break
time.sleep(2.5) # wait a bit before next copy

p.join()
total_len += obj_count * commit_count

print("Validating final prefix ...", flush=True)
validate_current_prefix(expected_len = total_len)
db0.close()


@pytest.mark.stress_test
@pytest.mark.skip(reason="Test disabled due to issue #605")
# test failing due to issue: https://github.com/dbzero-software/dbzero/issues/605
def test_refresh_prefix_continuous_process(db0_fixture):
px_name = db0.get_current_prefix().name

def validate_current_prefix(expected_len = None, expected_min_len = None):
root = db0.fetch(MemoTestSingleton)
assert not expected_min_len or len(root.value) >= expected_min_len
assert not expected_len or len(root.value) == expected_len
for item in root.value:
assert item.value == "b" * 1024
return len(root.value)

db0.close()

# in each 'epoch' we modify prefix while making copies
# then drop the original prefix and restore if from the last copy
epoch_count = 2
total_len = 0
for epoch in range(epoch_count):
print(f"=== Epoch {epoch} ===")
obj_count = 5000
commit_count = 100
# start the writer process for a long run
p = multiprocessing.Process(target=writer_process, args=(px_name, obj_count, commit_count, True))
p.start()

db0.init(DB0_DIR)
db0.open(px_name, "r")
last_len = 0
while True:
try:
root = db0.fetch(MemoTestSingleton)
if len(root.value) > 1:
last_len = len(root.value)
break
except Exception:
pass
time.sleep(0.1)

# validate prefix while writer is actively modifying it
while True:
if not p.is_alive():
break
print("--- Validate prefix iteration", flush=True)
last_len = validate_current_prefix(expected_min_len = last_len)
print(f"--- Prefix valid with {last_len} objects", flush=True)
if not p.is_alive():
break
time.sleep(2.5) # wait a bit before next copy

p.join()
total_len += obj_count * commit_count

print("Validating final prefix ...", flush=True)
validate_current_prefix(expected_len = total_len)
db0.close()
2 changes: 1 addition & 1 deletion src/dbzero/core/collections/full_text/FT_IndexIterator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ namespace db0
*/
}

template <typename bindex_t, typename key_t, typename IndexKeyT>
template <typename bindex_t, typename key_t, typename IndexKeyT>
typename FT_IndexIterator<bindex_t, key_t, IndexKeyT>::iterator &FT_IndexIterator<bindex_t, key_t, IndexKeyT>::getIterator()
{
if (m_is_detached) {
Expand Down
10 changes: 10 additions & 0 deletions src/dbzero/core/storage/ChangeLogIOStream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ namespace db0
Writer(ChangeLogIOStream &);

void appendChangeLog(const o_change_log_t &);

template <typename... Args>
void appendChangeLog(const ChangeLogData &data, Args&&... args)
{
auto size_of = o_change_log_t::measure(data, std::forward<Args>(args)...);
std::vector<char> buffer(size_of);
auto &change_log = o_change_log_t::__new(buffer.data(), data, std::forward<Args>(args)...);
appendChangeLog(change_log);
}

void flush();

private:
Expand Down
Loading
Loading