Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbzero/dbzero/dbzero.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def load_dynamic(name, path):

def __bootstrap__():
global __bootstrap__, __loader__, __file__
paths = [os.path.join(os.path.split(__file__)[0]), "/src/dev/build/debug", "/usr/local/lib/python3/dist-packages/dbzero/"]
paths = [os.path.join(os.path.split(__file__)[0]), "/src/dev/build/release", "/usr/local/lib/python3/dist-packages/dbzero/"]
__file__ = None
for path in paths:
if os.path.isdir(path):
Expand Down
2 changes: 1 addition & 1 deletion dbzero/dbzero/initialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def init(dbzero_root: str, **kwargs) -> None:

init_kwargs = {}

config_keys = ("autocommit", "autocommit_interval", "cache_size", "lang_cache_size")
config_keys = ("autocommit", "autocommit_interval", "cache_size", "lang_cache_size", "suppress_dist_overflow_error")
config = {}
for key in config_keys:
if key in kwargs:
Expand Down
4 changes: 2 additions & 2 deletions python_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def db0_autocommit_fixture(request):
shutil.rmtree(DB0_DIR)
# create empty directory
os.mkdir(DB0_DIR)
db0.init(DB0_DIR, autocommit=True, autocommit_interval=request.param)
db0.init(DB0_DIR, autocommit=True, autocommit_interval=request.param, suppress_dist_overflow_error=True)
db0.open("my-test-prefix")
yield db0
gc.collect()
Expand All @@ -92,7 +92,7 @@ def db0_no_autocommit():
# create empty directory
os.mkdir(DB0_DIR)
# disable autocommit on all prefixes
db0.init(DB0_DIR, autocommit=False)
db0.init(DB0_DIR, autocommit=False, suppress_dist_overflow_error=True)
db0.open("my-test-prefix")
yield db0
db0.close()
Expand Down
3 changes: 2 additions & 1 deletion src/dbzero/bindings/python/PyAPI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ namespace db0::python
{"cache_size", []{ return PyLong_FromUnsignedLongLong(BaseWorkspace::DEFAULT_CACHE_SIZE); }},
{"lang_cache_size", []{ return PyLong_FromUnsignedLongLong(LangCache::DEFAULT_CAPACITY); }},
{"autocommit", []{ Py_RETURN_TRUE; }},
{"autocommit_interval", []{ return PyLong_FromUnsignedLongLong(Workspace::DEFAULT_AUTOCOMMIT_INTERVAL_MS); }}
{"autocommit_interval", []{ return PyLong_FromUnsignedLongLong(Workspace::DEFAULT_AUTOCOMMIT_INTERVAL_MS); }},
{"suppress_dist_overflow_error", []{ Py_RETURN_FALSE; }}
};
for (const auto &[key_str, default_fn] : defaults) {
// Populate default values so then can be easily accessed with get_config
Expand Down
5 changes: 5 additions & 0 deletions src/dbzero/core/exception/Exceptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,9 @@ namespace db0
{
}

CacheException::CacheException()
: CriticalException(exception_id)
{
}

}
7 changes: 7 additions & 0 deletions src/dbzero/core/exception/Exceptions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,11 @@ namespace db0
BadAddressException();
};

class CacheException: public CriticalException
{
public:
static constexpr int exception_id = EXCEPTION_ID_PREFIX::BASIC | 0x0f;
CacheException();
};

}
7 changes: 6 additions & 1 deletion src/dbzero/core/memory/CacheRecycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ namespace db0
CacheRecycler::CacheRecycler(std::size_t capacity, const std::atomic<std::size_t> &dirty_meter,
std::optional<std::size_t> flush_size,
std::function<void(std::size_t limit)> flush_dirty,
std::function<bool(bool threshold_reached)> flush_callback)
std::function<bool(bool threshold_reached)> flush_callback,
bool suppress_dist_overflow_error)
: m_capacity(capacity)
// NOTE: buffers are overprovisioned
, m_res_bufs { getMaxSize(m_capacity), getMaxSize(m_capacity) }
Expand All @@ -36,6 +37,7 @@ namespace db0
, m_flush_size(flush_size.value_or(DEFAULT_FLUSH_SIZE))
, m_flush_dirty(flush_dirty)
, m_flush_callback(flush_callback)
, m_suppress_dist_overflow_error(suppress_dist_overflow_error)
{
}

Expand Down Expand Up @@ -131,6 +133,9 @@ namespace db0
updateSize(lock, m_capacity - flush_size);
flushed = true;
flush_result = m_current_size[priority] <= (m_capacity - flush_size);
if(getCurrentSize() >= m_capacity && !m_suppress_dist_overflow_error){
THROWF(db0::CacheException) << "DIST Memory Overflow. Too many Python objects" << THROWF_END;
}
}
// resize is a costly operation but cannot be avoided if the number of locked
// resources exceeds the assumed limit
Expand Down
4 changes: 3 additions & 1 deletion src/dbzero/core/memory/CacheRecycler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ namespace db0
*/
CacheRecycler(std::size_t capacity, const std::atomic<std::size_t> &dirty_meter, std::optional<std::size_t> flush_size = {},
std::function<void(std::size_t limit)> flush_dirty = {},
std::function<bool(bool threshold_reached)> flush_callback = {});
std::function<bool(bool threshold_reached)> flush_callback = {},
bool suppress_dist_overflow_error = false);

void update(std::shared_ptr<ResourceLock> res_lock);

Expand Down Expand Up @@ -90,6 +91,7 @@ namespace db0
std::function<void(std::size_t limit)> m_flush_dirty;
std::function<bool(bool)> m_flush_callback;
std::pair<bool, bool> m_last_flush_callback_result = {true, false};
bool m_suppress_dist_overflow_error = false;

void resize(std::unique_lock<std::mutex> &, std::size_t new_size, int priority);

Expand Down
2 changes: 1 addition & 1 deletion src/dbzero/workspace/Fixture.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ namespace db0
for (auto &handler: m_flush_handlers) {
handler();
}

m_lang_cache.clear(true);
// lock for exclusive access
{
std::unique_lock<std::shared_mutex> lock(m_commit_mutex);
Expand Down
9 changes: 6 additions & 3 deletions src/dbzero/workspace/Workspace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ namespace db0
{

BaseWorkspace::BaseWorkspace(const std::string &root_path, std::optional<std::size_t> cache_size,
std::optional<std::size_t> slab_cache_size, std::optional<std::size_t> flush_size, std::optional<LockFlags> default_lock_flags)
std::optional<std::size_t> slab_cache_size, std::optional<std::size_t> flush_size,
std::optional<LockFlags> default_lock_flags, std::optional<bool> suppress_dist_overflow_error)
: m_prefix_catalog(root_path)
, m_default_lock_flags(default_lock_flags ? *default_lock_flags : LockFlags())
, m_cache_recycler(cache_size ? *cache_size : DEFAULT_CACHE_SIZE, m_dirty_meter, flush_size,
Expand All @@ -20,7 +21,8 @@ namespace db0
},
[this](bool threshold_reached) -> bool {
return this->onCacheFlushed(threshold_reached);
}
},
suppress_dist_overflow_error ? *suppress_dist_overflow_error : false
)
, m_slab_recycler(slab_cache_size ? *slab_cache_size : DEFAULT_SLAB_CACHE_SIZE)
{
Expand Down Expand Up @@ -228,7 +230,8 @@ namespace db0
std::optional<std::size_t> slab_cache_size, std::optional<std::size_t> vobject_cache_size,
std::optional<std::size_t> flush_size, std::function<void(db0::swine_ptr<Fixture> &, bool, bool, bool)> fixture_initializer,
std::shared_ptr<Config> config, std::optional<LockFlags> default_lock_flags)
: BaseWorkspace(root_path, cache_size, slab_cache_size, flush_size, default_lock_flags)
: BaseWorkspace(root_path, cache_size, slab_cache_size, flush_size,
default_lock_flags, m_config ? config->get<bool>("suppress_dist_overflow_error") : false)
, m_config(config)
, m_fixture_catalog(m_prefix_catalog)
, m_fixture_initializer(fixture_initializer)
Expand Down
3 changes: 2 additions & 1 deletion src/dbzero/workspace/Workspace.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ namespace db0
**/
BaseWorkspace(const std::string &root_path = "", std::optional<std::size_t> cache_size = {},
std::optional<std::size_t> slab_cache_size = {}, std::optional<std::size_t> flush_size = {},
std::optional<LockFlags> default_lock_flags = {});
std::optional<LockFlags> default_lock_flags = {},
std::optional<bool> suppress_dist_overflow_error = {});
virtual ~BaseWorkspace()= default;

/**
Expand Down