diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 45c5b0582..40b476669 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -14,11 +14,11 @@ on: jobs: build: - runs-on: 'ubuntu-22.04' + runs-on: 'ubuntu-24.04' name: 'github pages' env: BUILD_TYPE: 'Release' - PYTHON_VERSION: '3.9' + PYTHON_VERSION: '3.12' strategy: fail-fast: false steps: diff --git a/.github/workflows/keyvi.yml b/.github/workflows/keyvi.yml index 5388a425a..3d94581c4 100644 --- a/.github/workflows/keyvi.yml +++ b/.github/workflows/keyvi.yml @@ -20,7 +20,7 @@ jobs: fail-fast: false matrix: type: ['Release', 'Debug'] - os: ['macos-latest', 'ubuntu-22.04'] + os: ['macos-latest', 'ubuntu-24.04'] steps: - name: install Linux deps if: runner.os == 'Linux' @@ -55,4 +55,4 @@ jobs: - name: Unit tests id: unit_tests run: | - build/unit_test_all + build/unit_test_all -l unit_scope diff --git a/keyvi/include/keyvi/dictionary/match.h b/keyvi/include/keyvi/dictionary/match.h index ee80d990d..0fe922107 100644 --- a/keyvi/include/keyvi/dictionary/match.h +++ b/keyvi/include/keyvi/dictionary/match.h @@ -59,7 +59,7 @@ namespace dictionary { #ifdef Py_PYTHON_H class attributes_visitor : public boost::static_visitor { public: - PyObject* operator()(int i) const { return PyInt_FromLong(i); } + PyObject* operator()(int i) const { return PyLong_FromLong(i); } PyObject* operator()(double i) const { return PyFloat_FromDouble(i); } diff --git a/keyvi/include/keyvi/index/internal/index_writer_worker.h b/keyvi/include/keyvi/index/internal/index_writer_worker.h index 12690adfe..ea2f30e2d 100644 --- a/keyvi/include/keyvi/index/internal/index_writer_worker.h +++ b/keyvi/include/keyvi/index/internal/index_writer_worker.h @@ -70,10 +70,12 @@ class IndexWriterWorker final { using compiler_t = std::shared_ptr; struct IndexPayload { explicit IndexPayload(const std::string& index_directory, const keyvi::util::parameters_t& params) - : compiler_(), + : external_process_ctx_(), + compiler_(), write_counter_(0), segments_(), - mutex_(), + segments_mutex_(), + flush_mutex_(), index_directory_(index_directory), index_toc_file_(index_directory_ / "index.toc"), index_toc_file_part_(index_directory_ / "index.toc.part"), @@ -88,11 +90,13 @@ class IndexWriterWorker final { segments_ = std::make_shared(); } + boost::asio::io_context external_process_ctx_; compiler_t compiler_; std::atomic_size_t write_counter_; segments_t segments_; std::weak_ptr segments_weak_; - std::mutex mutex_; + std::mutex segments_mutex_; + std::mutex flush_mutex_; const boost::filesystem::path index_directory_; const boost::filesystem::path index_toc_file_; const boost::filesystem::path index_toc_file_part_; @@ -136,7 +140,7 @@ class IndexWriterWorker final { segments_t segments = payload_.segments_weak_.lock(); if (!segments) { TRACE("recreate segments weak ptr"); - std::unique_lock lock(payload_.mutex_); + std::unique_lock lock(payload_.segments_mutex_); payload_.segments_weak_ = payload_.segments_; segments = payload_.segments_; } @@ -205,14 +209,15 @@ class IndexWriterWorker final { Compile(&payload); }); } else { - std::mutex m; std::condition_variable c; - std::unique_lock lock(m); + std::unique_lock lock(payload_.flush_mutex_); - compiler_active_object_([&m, &c](IndexPayload& payload) { - PersistDeletes(&payload); - Compile(&payload); - std::unique_lock waitLock(m); + compiler_active_object_([&c](IndexPayload& payload) { + { + PersistDeletes(&payload); + Compile(&payload); + std::unique_lock lock(payload.flush_mutex_); + } c.notify_all(); }); @@ -319,7 +324,7 @@ class IndexWriterWorker final { // thread-safe swap { - std::unique_lock lock(payload_.mutex_); + std::unique_lock lock(payload_.segments_mutex_); payload_.segments_.swap(new_segments); } WriteToc(&payload_); @@ -390,7 +395,8 @@ class IndexWriterWorker final { payload_.merge_jobs_.emplace_back(to_merge, merge_policy_id, p, payload_.settings_); // force external merge if low on filedescriptors - payload_.merge_jobs_.back().Run(payload_.segments_->size() + to_merge.size() + 10 > payload_.max_segments_); + payload_.merge_jobs_.back().Run(&payload_.external_process_ctx_, + payload_.segments_->size() + to_merge.size() + 10 > payload_.max_segments_); } void LoadIndex() { @@ -466,7 +472,7 @@ class IndexWriterWorker final { // thread-safe swap { - std::unique_lock lock(payload->mutex_); + std::unique_lock lock(payload->segments_mutex_); payload->segments_.swap(new_segments); } diff --git a/keyvi/include/keyvi/index/internal/merge_job.h b/keyvi/include/keyvi/index/internal/merge_job.h index 30341ac31..9d2f0a278 100644 --- a/keyvi/include/keyvi/index/internal/merge_job.h +++ b/keyvi/include/keyvi/index/internal/merge_job.h @@ -26,7 +26,7 @@ #include // NOLINT #include -#include +#include #include "keyvi/dictionary/dictionary_merger.h" #include "keyvi/dictionary/dictionary_types.h" @@ -78,7 +78,7 @@ class MergeJob final { MergeJob& operator=(MergeJob const&) = delete; MergeJob(const MergeJob& that) = delete; - void Run(bool force_external_merge = false) { + void Run(boost::asio::io_context* external_process_ctx, bool force_external_merge = false) { uint64_t job_size = 0; for (const segment_t& segment : payload_.segments_) { @@ -88,7 +88,7 @@ class MergeJob final { if (force_external_merge == false && job_size < payload_.settings_.GetSegmentExternalMergeKeyThreshold()) { DoInternalMerge(); } else { - DoExternalProcessMerge(); + DoExternalProcessMerge(external_process_ctx); } } @@ -128,7 +128,7 @@ class MergeJob final { private: MergeJobPayload payload_; size_t id_; - std::shared_ptr external_process_; + std::shared_ptr external_process_; std::thread internal_merge_; void DoInternalMerge() { @@ -154,7 +154,7 @@ class MergeJob final { }); } - void DoExternalProcessMerge() { + void DoExternalProcessMerge(boost::asio::io_context* external_process_ctx) { payload_.start_time_ = std::chrono::system_clock::now(); std::vector args; @@ -169,7 +169,8 @@ class MergeJob final { args.push_back("-o"); args.push_back(payload_.output_filename_.string()); - external_process_.reset(new boost::process::child(payload_.settings_.GetKeyviMergerBin(), args)); + external_process_.reset( + new boost::process::v2::process(*external_process_ctx, payload_.settings_.GetKeyviMergerBin(), args)); } bool TryFinalizeMerge() { @@ -177,6 +178,8 @@ class MergeJob final { if (!external_process_->running()) { payload_.exit_code_ = external_process_->exit_code(); payload_.process_finished_ = true; + // free the process early, closes internal file descriptor + external_process_.reset(); return true; } } else if (internal_merge_.joinable()) { @@ -192,6 +195,8 @@ class MergeJob final { if (external_process_) { external_process_->wait(); payload_.exit_code_ = external_process_->exit_code(); + // free the process early, closes internal file descriptor + external_process_.reset(); } else { internal_merge_.join(); // exit code set by merge thread diff --git a/keyvi/tests/keyvi/index/index_limits_test.cpp b/keyvi/tests/keyvi/index/index_limits_test.cpp index 33a54c121..a41ee501f 100644 --- a/keyvi/tests/keyvi/index/index_limits_test.cpp +++ b/keyvi/tests/keyvi/index/index_limits_test.cpp @@ -28,6 +28,8 @@ #else #include +#include + #include #include @@ -63,7 +65,7 @@ BOOST_AUTO_TEST_CASE(filedescriptor_limit) { using boost::filesystem::temp_directory_path; using boost::filesystem::unique_path; - size_t old_limit = limit_filedescriptors(20); + size_t old_limit = limit_filedescriptors(40); auto tmp_path = temp_directory_path(); tmp_path /= unique_path("index-limits-test-temp-index-%%%%-%%%%-%%%%-%%%%"); @@ -85,7 +87,7 @@ BOOST_AUTO_TEST_CASE(filedescriptor_limit) { boost::filesystem::remove_all(tmp_path); size_t increased_file_descriptors = keyvi::util::OsUtils::TryIncreaseFileDescriptors(); - BOOST_CHECK(increased_file_descriptors > 20); + BOOST_CHECK(increased_file_descriptors > 40); limit_filedescriptors(old_limit); } diff --git a/keyvi/tests/keyvi/index/internal/merge_job_test.cpp b/keyvi/tests/keyvi/index/internal/merge_job_test.cpp index b2b91b227..3afab1601 100644 --- a/keyvi/tests/keyvi/index/internal/merge_job_test.cpp +++ b/keyvi/tests/keyvi/index/internal/merge_job_test.cpp @@ -24,7 +24,9 @@ */ #include // NOLINT +#include #include // NOLINT +#include #include @@ -51,6 +53,8 @@ namespace internal { BOOST_AUTO_TEST_SUITE(MergeJobTests) BOOST_AUTO_TEST_CASE(basic_merge) { + boost::asio::io_context external_process_ctx; + std::vector> test_data = { {"abc", "{a:1}"}, {"abbc", "{b:2}"}, {"abbcd", "{c:3}"}, {"abcde", "{a:1}"}, {"abdd", "{b:2}"}, {"bba", "{c:3}"}, }; @@ -70,7 +74,7 @@ BOOST_AUTO_TEST_CASE(basic_merge) { boost::filesystem::path p("merged.kv"); IndexSettings settings({{KEYVIMERGER_BIN, get_keyvimerger_bin()}}); MergeJob m({w1, w2}, 0, p, settings); - m.Run(); + m.Run(&external_process_ctx); int retry = 100; while (retry > 0) { diff --git a/python/src/addons/JsonVector.pyx b/python/src/addons/JsonVector.pyx index 33c582c9d..64a01a0b0 100644 --- a/python/src/addons/JsonVector.pyx +++ b/python/src/addons/JsonVector.pyx @@ -1,7 +1,7 @@ def __getitem__(self, index ): - assert isinstance(index, (int, long)), 'arg index wrong type' + assert isinstance(index, (int, int)), 'arg index wrong type' cdef libcpp_utf8_string _r = self.inst.get().Get((index)) py_result = json.loads(_r.decode('utf-8'))