Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 1 addition & 69 deletions python_tests/test_storage_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ class IndexContainer:

@db0.memo(singleton = True)
@dataclass
class IndexesSingleton:
# wystawcy po numerze NIP
class IndexesSingleton:
indexes: List[IndexContainer]


Expand All @@ -35,73 +34,6 @@ def format_results(diffs):
lines.append(line)
return "\n".join(lines)

@pytest.mark.stress_test
def test_io_operation_stability(db0_large_lang_cache_no_autocommit):
numbers = set()
print("Initializing test data...")
numbers = list(numbers)
indexes = IndexesSingleton(indexes=[])
BYTES = "DB0"*2200
diffs = []
indexes_count = 250000
for number in range(indexes_count):
indexes.indexes.append(IndexContainer(index=db0.index()))

# commit init
print("Performing initial commit...")
start = datetime.now()
db0.commit()
stop = datetime.now()

initial_commit_time = (stop - start).seconds
storage_stats = db0.get_storage_stats()
min_commit_time = initial_commit_time
max_commit_time = initial_commit_time
print(f" Initial commit time seconds: {initial_commit_time}")
print("Starting IO operation stability test...")

for i in range(10):
print(f" Iteration {i+1}/10")
start = datetime.now()
iterations = 100000
# perform iteration
for j in range(iterations):
number = (i*iterations + j)%indexes_count
index_container = indexes.indexes[number]
now = datetime.now()
new_value = Value(index_number=number, date=now, value=list_value)
index_container.index.add(now, new_value)

# calculate objects per second
stop = datetime.now()
seconds = (stop - start).miliseconds / 1000.0
print(f" Objects per second: {iterations / seconds}")

# commit changes
start = datetime.now()
db0.commit()
stop = datetime.now()

# measure commit time
commit_time = (stop - start).seconds
min_commit_time = min(min_commit_time, commit_time)
max_commit_time = max(max_commit_time, commit_time)
print(f" Commit time seconds: {commit_time}")

storage_stats_after = db0.get_storage_stats()
# get storage stats difference
diff = {}
for key in storage_stats_after:
diff[key] = storage_stats_after[key] - storage_stats.get(key, 0)
print(f" Storage stats diff: {diff}")
diffs.append(diff)
storage_stats = storage_stats_after

results = format_results(diffs)
print(f"IO Operation Stability Test Results:\n{results}")
print(f"Min commit time: {min_commit_time} seconds")
print(f"Max commit time: {max_commit_time} seconds")


@pytest.mark.stress_test
def test_big_cache_should_prevent_random_reads(db0_large_lang_cache_no_autocommit):
Expand Down
5 changes: 5 additions & 0 deletions run_asan_stress_tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash
export PYTHONIOENCODING=utf8
export LD_PRELOAD=$(gcc -print-file-name=libasan.so)

python3 -m pytest -m 'stress_test' -c pytest.ini --capture=no "$@"
33 changes: 23 additions & 10 deletions src/dbzero/core/collections/SGB_Tree/SGB_CompressedLookupTree.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,11 @@ DB0_PACKED_END
// node will be sorted if needed (only if in READ/WRITE mode)
if (this->m_access_type == AccessType::READ_WRITE) {
this->onNodeLookup(node);
}
if (!node->header().canFit(key)) {
return { nullptr, sg_tree_const_iterator() };
}
// within the node look up by compressed key
// within the node look up by compressed key (only if able to fit)
return { node->lower_equal_bound(node->header().compress(key), this->m_heap_comp), node };
}

Expand All @@ -305,15 +308,18 @@ DB0_PACKED_END
this->onNodeLookup(node);
}
// within the node look up by compressed key
auto item_ptr = node->lower_equal_bound(node->header().compress(key), this->m_heap_comp);
if (!item_ptr) {
return std::nullopt;
// NOTE: if unable to fit key then the item cannot be present in the node
if (node->header().canFit(key)) {
auto item_ptr = node->lower_equal_bound(node->header().compress(key), this->m_heap_comp);
if (item_ptr) {
// return uncompressed
return node->header().uncompress(*item_ptr);
}
}

// return uncompressed
return node->header().uncompress(*item_ptr);
}

return std::nullopt;
}

// Locate first element which is greater or equal to the key
template <typename KeyT> std::optional<ItemT> upper_equal_bound(const KeyT &key) const
{
Expand All @@ -332,11 +338,14 @@ DB0_PACKED_END
this->onNodeLookup(node);
}
// within the node look up by compressed key
auto item_ptr = node->upper_equal_bound(node->header().compress(key), this->m_heap_comp);
const CompressedItemT *item_ptr = nullptr;
if (node->header().canFit(key)) {
item_ptr = node->upper_equal_bound(node->header().compress(key), this->m_heap_comp);
}
if (!item_ptr) {
// check within the next node
++node;
if (node == base_t::end()) {
if (node == base_t::end() || !node->header().canFit(key)) {
return std::nullopt;
}
item_ptr = node->upper_equal_bound(node->header().compress(key), this->m_heap_comp);
Expand All @@ -361,7 +370,11 @@ DB0_PACKED_END
if (this->m_access_type == AccessType::READ_WRITE) {
this->onNodeLookup(node);
}
if (!node->header().canFit(key)) {
return nullptr;
}
// within the node look up by compressed key
// NOTE: if unable to fit key then the item cannot be present in the node
return node->lower_equal_bound(node->header().compress(key), this->m_heap_comp);
}

Expand Down
10 changes: 6 additions & 4 deletions src/dbzero/core/storage/SparseIndex.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
namespace db0

{
DB0_PACKED_BEGIN

struct SI_Item;
struct SI_CompressedItem;
Expand All @@ -32,6 +31,7 @@ DB0_PACKED_BEGIN
bool operator()(std::pair<std::uint64_t, std::uint32_t>, const SI_Item &) const;
};

DB0_PACKED_BEGIN
struct DB0_PACKED_ATTR SI_Item
{
using CompT = SI_ItemCompT;
Expand Down Expand Up @@ -65,7 +65,8 @@ DB0_PACKED_BEGIN
return m_state_num;
}
};

DB0_PACKED_END

struct SI_CompressedItemCompT
{
bool operator()(const SI_CompressedItem &, const SI_CompressedItem &) const;
Expand All @@ -75,8 +76,9 @@ DB0_PACKED_BEGIN
{
bool operator()(const SI_CompressedItem &, const SI_CompressedItem &) const;
};

// Compressed items are actual in-memory representation
DB0_PACKED_BEGIN
struct DB0_PACKED_ATTR SI_CompressedItem
{
using CompT = SI_CompressedItemCompT;
Expand Down Expand Up @@ -120,8 +122,8 @@ DB0_PACKED_BEGIN

std::string toString() const;
};
DB0_PACKED_END

using SparseIndex = SparseIndexBase<SI_Item, SI_CompressedItem>;

DB0_PACKED_END
}
8 changes: 7 additions & 1 deletion src/dbzero/core/storage/SparseIndexBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,14 @@ namespace db0

// Compress the key part only for lookup purposes
CompressedItemT compress(std::pair<PageNumT, StateNumT>) const;

CompressedItemT compress(const ItemT &) const;

ItemT uncompress(const CompressedItemT &) const;

// From a compressed item, retrieve the (logical) page number only
PageNumT getPageNum(const CompressedItemT &) const;

bool canFit(std::pair<PageNumT, StateNumT>) const;
bool canFit(const ItemT &) const;

std::string toString(const CompressedItemT &) const;
Expand Down Expand Up @@ -309,6 +309,12 @@ DB0_PACKED_END
bool SparseIndexBase<ItemT, CompressedItemT>::BlockHeader::canFit(const ItemT &item) const {
return this->m_first_page_num == (item.m_page_num >> 24);
}

template <typename ItemT, typename CompressedItemT>
bool SparseIndexBase<ItemT, CompressedItemT>::BlockHeader::canFit(std::pair<PageNumT, StateNumT> item) const
{
return this->m_first_page_num == (item.first >> 24);
}

template <typename ItemT, typename CompressedItemT>
ItemT SparseIndexBase<ItemT, CompressedItemT>::lookup(PageNumT page_num, StateNumT state_num) const {
Expand Down