Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ on:

jobs:
build:
runs-on: 'ubuntu-22.04'
runs-on: 'ubuntu-24.04'
name: 'github pages'
env:
BUILD_TYPE: 'Release'
PYTHON_VERSION: '3.9'
PYTHON_VERSION: '3.12'
strategy:
fail-fast: false
steps:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/keyvi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
fail-fast: false
matrix:
type: ['Release', 'Debug']
os: ['macos-latest', 'ubuntu-22.04']
os: ['macos-latest', 'ubuntu-24.04']
steps:
- name: install Linux deps
if: runner.os == 'Linux'
Expand Down Expand Up @@ -55,4 +55,4 @@ jobs:
- name: Unit tests
id: unit_tests
run: |
build/unit_test_all
build/unit_test_all -l unit_scope
2 changes: 1 addition & 1 deletion keyvi/include/keyvi/dictionary/match.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ namespace dictionary {
#ifdef Py_PYTHON_H
class attributes_visitor : public boost::static_visitor<PyObject*> {
public:
PyObject* operator()(int i) const { return PyInt_FromLong(i); }
PyObject* operator()(int i) const { return PyLong_FromLong(i); }

PyObject* operator()(double i) const { return PyFloat_FromDouble(i); }

Expand Down
32 changes: 19 additions & 13 deletions keyvi/include/keyvi/index/internal/index_writer_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,12 @@ class IndexWriterWorker final {
using compiler_t = std::shared_ptr<dictionary::JsonDictionaryIndexCompiler>;
struct IndexPayload {
explicit IndexPayload(const std::string& index_directory, const keyvi::util::parameters_t& params)
: compiler_(),
: external_process_ctx_(),
compiler_(),
write_counter_(0),
segments_(),
mutex_(),
segments_mutex_(),
flush_mutex_(),
index_directory_(index_directory),
index_toc_file_(index_directory_ / "index.toc"),
index_toc_file_part_(index_directory_ / "index.toc.part"),
Expand All @@ -88,11 +90,13 @@ class IndexWriterWorker final {
segments_ = std::make_shared<segment_vec_t>();
}

boost::asio::io_context external_process_ctx_;
compiler_t compiler_;
std::atomic_size_t write_counter_;
segments_t segments_;
std::weak_ptr<segment_vec_t> segments_weak_;
std::mutex mutex_;
std::mutex segments_mutex_;
std::mutex flush_mutex_;
const boost::filesystem::path index_directory_;
const boost::filesystem::path index_toc_file_;
const boost::filesystem::path index_toc_file_part_;
Expand Down Expand Up @@ -136,7 +140,7 @@ class IndexWriterWorker final {
segments_t segments = payload_.segments_weak_.lock();
if (!segments) {
TRACE("recreate segments weak ptr");
std::unique_lock<std::mutex> lock(payload_.mutex_);
std::unique_lock<std::mutex> lock(payload_.segments_mutex_);
payload_.segments_weak_ = payload_.segments_;
segments = payload_.segments_;
}
Expand Down Expand Up @@ -205,14 +209,15 @@ class IndexWriterWorker final {
Compile(&payload);
});
} else {
std::mutex m;
std::condition_variable c;
std::unique_lock<std::mutex> lock(m);
std::unique_lock<std::mutex> lock(payload_.flush_mutex_);

compiler_active_object_([&m, &c](IndexPayload& payload) {
PersistDeletes(&payload);
Compile(&payload);
std::unique_lock<std::mutex> waitLock(m);
compiler_active_object_([&c](IndexPayload& payload) {
{
PersistDeletes(&payload);
Compile(&payload);
std::unique_lock<std::mutex> lock(payload.flush_mutex_);
}
c.notify_all();
});

Expand Down Expand Up @@ -319,7 +324,7 @@ class IndexWriterWorker final {

// thread-safe swap
{
std::unique_lock<std::mutex> lock(payload_.mutex_);
std::unique_lock<std::mutex> lock(payload_.segments_mutex_);
payload_.segments_.swap(new_segments);
}
WriteToc(&payload_);
Expand Down Expand Up @@ -390,7 +395,8 @@ class IndexWriterWorker final {
payload_.merge_jobs_.emplace_back(to_merge, merge_policy_id, p, payload_.settings_);

// force external merge if low on filedescriptors
payload_.merge_jobs_.back().Run(payload_.segments_->size() + to_merge.size() + 10 > payload_.max_segments_);
payload_.merge_jobs_.back().Run(&payload_.external_process_ctx_,
payload_.segments_->size() + to_merge.size() + 10 > payload_.max_segments_);
}

void LoadIndex() {
Expand Down Expand Up @@ -466,7 +472,7 @@ class IndexWriterWorker final {

// thread-safe swap
{
std::unique_lock<std::mutex> lock(payload->mutex_);
std::unique_lock<std::mutex> lock(payload->segments_mutex_);
payload->segments_.swap(new_segments);
}

Expand Down
17 changes: 11 additions & 6 deletions keyvi/include/keyvi/index/internal/merge_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
#include <thread> // NOLINT
#include <vector>

#include <boost/process.hpp>
#include <boost/process/v2/process.hpp>

#include "keyvi/dictionary/dictionary_merger.h"
#include "keyvi/dictionary/dictionary_types.h"
Expand Down Expand Up @@ -78,7 +78,7 @@ class MergeJob final {
MergeJob& operator=(MergeJob const&) = delete;
MergeJob(const MergeJob& that) = delete;

void Run(bool force_external_merge = false) {
void Run(boost::asio::io_context* external_process_ctx, bool force_external_merge = false) {
uint64_t job_size = 0;

for (const segment_t& segment : payload_.segments_) {
Expand All @@ -88,7 +88,7 @@ class MergeJob final {
if (force_external_merge == false && job_size < payload_.settings_.GetSegmentExternalMergeKeyThreshold()) {
DoInternalMerge();
} else {
DoExternalProcessMerge();
DoExternalProcessMerge(external_process_ctx);
}
}

Expand Down Expand Up @@ -128,7 +128,7 @@ class MergeJob final {
private:
MergeJobPayload payload_;
size_t id_;
std::shared_ptr<boost::process::child> external_process_;
std::shared_ptr<boost::process::v2::process> external_process_;
std::thread internal_merge_;

void DoInternalMerge() {
Expand All @@ -154,7 +154,7 @@ class MergeJob final {
});
}

void DoExternalProcessMerge() {
void DoExternalProcessMerge(boost::asio::io_context* external_process_ctx) {
payload_.start_time_ = std::chrono::system_clock::now();

std::vector<std::string> args;
Expand All @@ -169,14 +169,17 @@ class MergeJob final {
args.push_back("-o");
args.push_back(payload_.output_filename_.string());

external_process_.reset(new boost::process::child(payload_.settings_.GetKeyviMergerBin(), args));
external_process_.reset(
new boost::process::v2::process(*external_process_ctx, payload_.settings_.GetKeyviMergerBin(), args));
}

bool TryFinalizeMerge() {
if (external_process_) {
if (!external_process_->running()) {
payload_.exit_code_ = external_process_->exit_code();
payload_.process_finished_ = true;
// free the process early, closes internal file descriptor
external_process_.reset();
return true;
}
} else if (internal_merge_.joinable()) {
Expand All @@ -192,6 +195,8 @@ class MergeJob final {
if (external_process_) {
external_process_->wait();
payload_.exit_code_ = external_process_->exit_code();
// free the process early, closes internal file descriptor
external_process_.reset();
} else {
internal_merge_.join();
// exit code set by merge thread
Expand Down
6 changes: 4 additions & 2 deletions keyvi/tests/keyvi/index/index_limits_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
#else
#include <sys/resource.h>

#include <string>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

string is used on line 39. However it obviously worked before, because it is implicitly included from on of the other includes.

The reason why this pops up now: The code is checked with cpplint, a static code checker. It produces an error called "include what you use". cpplint must pass as part of the checkstyle workflow run on the PR. checkstyle runs only on files that change. So with other words: Because I made a change to this file, cpplint forced me to cleanup.


#include <boost/filesystem.hpp>
#include <boost/test/unit_test.hpp>

Expand Down Expand Up @@ -63,7 +65,7 @@ BOOST_AUTO_TEST_CASE(filedescriptor_limit) {
using boost::filesystem::temp_directory_path;
using boost::filesystem::unique_path;

size_t old_limit = limit_filedescriptors(20);
size_t old_limit = limit_filedescriptors(40);

auto tmp_path = temp_directory_path();
tmp_path /= unique_path("index-limits-test-temp-index-%%%%-%%%%-%%%%-%%%%");
Expand All @@ -85,7 +87,7 @@ BOOST_AUTO_TEST_CASE(filedescriptor_limit) {
boost::filesystem::remove_all(tmp_path);

size_t increased_file_descriptors = keyvi::util::OsUtils::TryIncreaseFileDescriptors();
BOOST_CHECK(increased_file_descriptors > 20);
BOOST_CHECK(increased_file_descriptors > 40);

limit_filedescriptors(old_limit);
}
Expand Down
6 changes: 5 additions & 1 deletion keyvi/tests/keyvi/index/internal/merge_job_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
*/

#include <chrono> // NOLINT
#include <string>
#include <thread> // NOLINT
#include <vector>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same answer: string and vector are used in this file, however this file hasn't been touched since the introduction of cpplint as CI workflow.


#include <boost/test/unit_test.hpp>

Expand All @@ -51,6 +53,8 @@ namespace internal {
BOOST_AUTO_TEST_SUITE(MergeJobTests)

BOOST_AUTO_TEST_CASE(basic_merge) {
boost::asio::io_context external_process_ctx;

std::vector<std::pair<std::string, std::string>> test_data = {
{"abc", "{a:1}"}, {"abbc", "{b:2}"}, {"abbcd", "{c:3}"}, {"abcde", "{a:1}"}, {"abdd", "{b:2}"}, {"bba", "{c:3}"},
};
Expand All @@ -70,7 +74,7 @@ BOOST_AUTO_TEST_CASE(basic_merge) {
boost::filesystem::path p("merged.kv");
IndexSettings settings({{KEYVIMERGER_BIN, get_keyvimerger_bin()}});
MergeJob m({w1, w2}, 0, p, settings);
m.Run();
m.Run(&external_process_ctx);

int retry = 100;
while (retry > 0) {
Expand Down
2 changes: 1 addition & 1 deletion python/src/addons/JsonVector.pyx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@


def __getitem__(self, index ):
assert isinstance(index, (int, long)), 'arg index wrong type'
assert isinstance(index, (int, int)), 'arg index wrong type'

cdef libcpp_utf8_string _r = self.inst.get().Get((<size_t>index))
py_result = json.loads(_r.decode('utf-8'))
Expand Down
Loading