Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 29 additions & 6 deletions src/dbzero/core/memory/CacheRecycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,9 @@ namespace db0
}
m_current_size[priority] += lock_size;
if (getCurrentSize() > m_capacity) {
// try reducing cache utilization to capacity minus flush size
auto flush_size = std::min(m_capacity >> 1, m_flush_size);
updateSize(lock, m_capacity - flush_size);
flushed = true;
flush_result = m_current_size[priority] <= (m_capacity - flush_size);
auto flush_returned_values = _flush(lock, priority);
flushed = flush_returned_values.first;
flush_result = flush_returned_values.second;
}
// resize is a costly operation but cannot be avoided if the number of locked
// resources exceeds the assumed limit
Expand Down Expand Up @@ -247,5 +245,30 @@ namespace db0
std::unique_lock<std::mutex> lock(m_mutex);
return { m_current_size[0], m_current_size[1] };
}


std::pair<bool, bool> CacheRecycler::_flush(std::unique_lock<std::mutex> &lock, int priority)
{
auto now = std::chrono::high_resolution_clock::now();
if (now >= m_next_flush_time) {
// try reducing cache utilization to capacity minus flush size
auto flush_size = std::min(m_capacity >> 1, m_flush_size);
auto size_before_flush = getCurrentSize();
updateSize(lock, m_capacity - flush_size);
// Update backoff state based on flush result(need to flush more than 10 % of flush size)
if ((size_before_flush - getCurrentSize()) > (flush_size/10)) {
// Success: reset delay
m_current_flush_delay = std::chrono::nanoseconds{0};
m_next_flush_time = std::chrono::high_resolution_clock::time_point{};
} else {
// Failure: apply exponential backoff
// adding +1 to avoid condition for zero delay
auto new_delay = std::min(m_current_flush_delay.count() * 2 + 1 , MAX_FLUSH_DELAY_NS);
m_current_flush_delay = std::chrono::nanoseconds{new_delay};
now = std::chrono::high_resolution_clock::now();
m_next_flush_time = now + m_current_flush_delay;
}
return { true, m_current_size[priority] <= (m_capacity - flush_size) };
}
return { false, false };
}
}
14 changes: 11 additions & 3 deletions src/dbzero/core/memory/CacheRecycler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <functional>
#include <optional>
#include <atomic>
#include <chrono>
#include <dbzero/core/memory/ResourceLock.hpp>
#include <dbzero/core/utils/FixedList.hpp>

Expand All @@ -21,6 +22,8 @@ namespace db0
{
public:
static constexpr std::size_t DEFAULT_FLUSH_SIZE = 256u << 20;
static constexpr std::int64_t INITIAL_FLUSH_DELAY_NS = 1'000; // 1us
static constexpr std::int64_t MAX_FLUSH_DELAY_NS = 1'000'000'000; // 1 second

/**
* Holds resource locks and recycles based on LRU policy
Expand Down Expand Up @@ -94,9 +97,12 @@ namespace db0
std::function<bool(bool)> m_flush_callback;
std::pair<bool, bool> m_last_flush_callback_result = {true, false};

void resize(std::unique_lock<std::mutex> &, std::size_t new_size, int priority);

/**
// Flush rate limiting
std::chrono::high_resolution_clock::time_point m_next_flush_time{};
std::chrono::nanoseconds m_current_flush_delay{0};

void resize(std::unique_lock<std::mutex> &, std::size_t new_size, int priority);
/**
* Adjusts cache size after updates, collect locks to unlock (can be unlocked off main thread)
* @param released_locks locks to be released
* @param release_size total number of bytes to be released
Expand All @@ -111,6 +117,8 @@ namespace db0
inline std::size_t getCurrentSize() const {
return m_current_size[0] + m_current_size[1];
}

std::pair<bool, bool> _flush(std::unique_lock<std::mutex> &, int priority);
};

}