diff --git a/src/ailego/CMakeLists.txt b/src/ailego/CMakeLists.txt index 29cf22cd1..1e524a1b2 100644 --- a/src/ailego/CMakeLists.txt +++ b/src/ailego/CMakeLists.txt @@ -16,6 +16,10 @@ set(EXTRA_LIBS ${CMAKE_THREAD_LIBS_INIT} ${CMAKE_DL_LIBS}) if(UNIX AND NOT APPLE) list(APPEND EXTRA_LIBS ${LIB_RT}) + find_library(LIB_AIO NAMES aio) + if(LIB_AIO) + list(APPEND EXTRA_LIBS ${LIB_AIO}) + endif() endif() if(NOT ANDROID AND AUTO_DETECT_ARCH) @@ -123,3 +127,8 @@ cc_library( LIBS ${EXTRA_LIBS} VERSION "${GIT_SRCS_VER}" ) + +if(LIB_AIO) + target_compile_definitions(zvec_ailego PUBLIC ZVEC_HAS_LIBAIO) + message(STATUS "Found libaio: ${LIB_AIO} (HNSW async prefetch enabled)") +endif() diff --git a/src/ailego/buffer/vector_page_table.cc b/src/ailego/buffer/vector_page_table.cc index 3318db1ca..13d55b212 100644 --- a/src/ailego/buffer/vector_page_table.cc +++ b/src/ailego/buffer/vector_page_table.cc @@ -13,11 +13,14 @@ // limitations under the License. #include +#include #include #include #include #include #include +#include +#include "../io/aligned_async_io.h" #include #if defined(_MSC_VER) @@ -272,27 +275,71 @@ char *VectorPageTable::set_block_acquired(block_id_t block_id, char *buffer, } } -VecBufferPool::VecBufferPool(const std::string &filename, bool writable) { +VecBufferPool::VecBufferPool(const std::string &filename, bool writable, + bool enable_direct_io) { file_name_ = filename; writable_ = writable; #if defined(_MSC_VER) int flags = writable_ ? (O_RDWR | _O_BINARY) : (O_RDONLY | _O_BINARY); fd_ = _open(filename.c_str(), flags, 0644); + meta_fd_ = _open(filename.c_str(), flags, 0644); + (void)enable_direct_io; // O_DIRECT not supported on this path #else - int flags = writable_ ? O_RDWR : O_RDONLY; - fd_ = ::open(filename.c_str(), flags, 0644); + int base_flags = writable_ ? O_RDWR : O_RDONLY; + // Metadata channel: always buffered IO. Serves the unaligned + // header/footer/segment_meta reads & writes and benefits from page cache. + meta_fd_ = ::open(filename.c_str(), base_flags, 0644); + // Page-data channel: optionally O_DIRECT; fall back to buffered open when + // the filesystem (tmpfs/overlayfs/...) rejects O_DIRECT. + int data_flags = base_flags; +#ifdef O_DIRECT + if (enable_direct_io) { + data_flags |= O_DIRECT; + } +#endif + fd_ = ::open(filename.c_str(), data_flags, 0644); +#ifdef O_DIRECT + if (fd_ < 0 && (data_flags & O_DIRECT)) { + LOG_WARN( + "VecBufferPool: open with O_DIRECT failed for file[%s] (errno=%d), " + "falling back to buffered IO", + filename.c_str(), errno); + fd_ = ::open(filename.c_str(), base_flags, 0644); + direct_io_enabled_ = false; + } else { + direct_io_enabled_ = (data_flags & O_DIRECT) != 0; + } +#else + (void)enable_direct_io; +#endif +#endif + if (fd_ < 0 || meta_fd_ < 0) { + if (fd_ >= 0) { +#if defined(_MSC_VER) + _close(fd_); +#else + ::close(fd_); #endif - if (fd_ < 0) { + } + if (meta_fd_ >= 0) { +#if defined(_MSC_VER) + _close(meta_fd_); +#else + ::close(meta_fd_); +#endif + } throw std::runtime_error("Failed to open file: " + filename); } #if defined(_MSC_VER) struct _stat64 st; if (_fstat64(fd_, &st) < 0) { _close(fd_); + _close(meta_fd_); #else struct stat st; if (fstat(fd_, &st) < 0) { ::close(fd_); + ::close(meta_fd_); #endif throw std::runtime_error("Failed to stat file: " + filename); } @@ -375,16 +422,24 @@ char *VecBufferPool::acquire_buffer(block_id_t page_id, int retry) { } size_t page_offset = page_id * kVectorPageSize; - size_t expected_bytes = std::min(kVectorPageSize, file_size_ - page_offset); - if (expected_bytes < kVectorPageSize) { - std::memset(buffer + expected_bytes, 0, kVectorPageSize - expected_bytes); - } - ssize_t read_bytes = zvec_pread(fd_, buffer, expected_bytes, page_offset); - if (read_bytes != static_cast(expected_bytes)) { + // O_DIRECT requires the IO length to be a multiple of the device block + // size. The backing file size is always page-aligned (IndexMapping + + // append_segment guarantee this), so reading a full page never reads past + // EOF; the tail padding is the file's own zero region. In direct mode we + // MUST read the whole page; the buffered path keeps the legacy short-read + // + zero-pad behaviour. + size_t read_len = direct_io_enabled_ + ? kVectorPageSize + : std::min(kVectorPageSize, file_size_ - page_offset); + if (read_len < kVectorPageSize) { + std::memset(buffer + read_len, 0, kVectorPageSize - read_len); + } + ssize_t read_bytes = zvec_pread(fd_, buffer, read_len, page_offset); + if (read_bytes != static_cast(read_len)) { LOG_ERROR( "Buffer pool failed to read file at offset: file[%s], page_id[%zu], " "offset[%zu], expected[%zu], got[%zd]", - file_name_.c_str(), page_id, page_offset, expected_bytes, read_bytes); + file_name_.c_str(), page_id, page_offset, read_len, read_bytes); MemoryLimitPool::get_instance().release_buffer(buffer, kVectorPageSize); return nullptr; } @@ -392,7 +447,7 @@ char *VecBufferPool::acquire_buffer(block_id_t page_id, int retry) { } int VecBufferPool::get_meta(size_t offset, size_t length, char *buffer) { - ssize_t read_bytes = zvec_pread(fd_, buffer, length, offset); + ssize_t read_bytes = zvec_pread(meta_fd_, buffer, length, offset); if (read_bytes != static_cast(length)) { LOG_ERROR( "Buffer pool failed to read file at offset: file[%s], offset[%zu], " @@ -446,7 +501,7 @@ int VecBufferPool::write_meta(size_t offset, size_t length, file_name_.c_str()); return -1; } - ssize_t w = zvec_pwrite(fd_, buffer, length, offset); + ssize_t w = zvec_pwrite(meta_fd_, buffer, length, offset); if (w != static_cast(length)) { LOG_ERROR( "Buffer pool failed to write meta: file[%s], offset[%zu], " @@ -495,6 +550,10 @@ bool VecBufferPool::extend_file(size_t new_size) { if (new_size <= file_size_) { return true; } + // The backing file must stay page-aligned so that O_DIRECT full-page reads + // never read past EOF. All current callers pass page-aligned targets. + assert(new_size % kVectorPageSize == 0 && + "extend_file target must be page-aligned for O_DIRECT correctness"); // Pre-validate against the page table's static capacity BEFORE mutating // any on-disk state. Otherwise a successful ftruncate followed by a // failed page_table_.extend() would leave the file size and the page @@ -617,5 +676,63 @@ void VecBufferPoolHandle::acquire_one(block_id_t block_id) { pool_.page_table_.acquire_block(block_id); } +void VecBufferPool::batch_prefetch(const block_id_t *page_ids, size_t count) { +#ifdef ZVEC_HAS_LIBAIO + if (count == 0) return; + + static thread_local ScopedIOContext tl_io_ctx; + if (!tl_io_ctx.valid()) return; + + std::vector reads; + std::vector> pending; + reads.reserve(count); + pending.reserve(count); + + for (size_t i = 0; i < count; ++i) { + block_id_t pid = page_ids[i]; + if (pid >= page_table_.entry_num()) continue; + char *existing = page_table_.acquire_block(pid); + if (existing) { + page_table_.release_block(pid); + continue; + } + char *buf = nullptr; + bool found = + MemoryLimitPool::get_instance().try_acquire_buffer(kVectorPageSize, buf); + if (!found || !buf) continue; + + AlignedRead rd; + rd.offset = static_cast(pid) * kVectorPageSize; + rd.len = kVectorPageSize; + rd.buf = buf; + reads.push_back(rd); + pending.emplace_back(pid, buf); + } + + if (reads.empty()) return; + + int rc = execute_aligned_io(tl_io_ctx.get(), fd_, reads); + if (rc != 0) { + for (auto &p : pending) { + MemoryLimitPool::get_instance().release_buffer(p.second, kVectorPageSize); + } + return; + } + + for (auto &p : pending) { + block_id_t pid = p.first; + char *buf = p.second; + size_t page_offset = static_cast(pid) * kVectorPageSize; + std::lock_guard lock( + block_mutexes_[pid % VecBufferPool::kMutexBucketCount]); + page_table_.set_block_acquired(pid, buf, page_offset); + page_table_.release_block(pid); + } +#else + (void)page_ids; + (void)count; +#endif +} + } // namespace ailego } // namespace zvec diff --git a/src/ailego/io/aligned_async_io.cc b/src/ailego/io/aligned_async_io.cc new file mode 100644 index 000000000..83ac90ba0 --- /dev/null +++ b/src/ailego/io/aligned_async_io.cc @@ -0,0 +1,140 @@ +#include "aligned_async_io.h" + +#include +#include + +#if defined(__linux__) && defined(ZVEC_HAS_LIBAIO) +#include + +namespace zvec { +namespace ailego { + +static int execute_io_pread(int fd, std::vector &reads) { + for (auto &r : reads) { + ssize_t got = ::pread(fd, r.buf, r.len, static_cast(r.offset)); + if (got != static_cast(r.len)) { + LOG_ERROR( + "pread fallback failed: fd=%d, offset=%llu, len=%llu, got=%zd, " + "errno=%d", + fd, (unsigned long long)r.offset, (unsigned long long)r.len, got, + errno); + return -1; + } + } + return 0; +} + +static constexpr int kMaxEvents = 128; + +ScopedIOContext::ScopedIOContext() { + ctx_ = 0; + int ret = io_setup(kMaxEvents, &ctx_); + if (ret == 0) { + valid_ = true; + } else { + LOG_WARN("io_setup failed (ret=%d, errno=%d); prefetch disabled on thread", + ret, errno); + valid_ = false; + } +} + +ScopedIOContext::~ScopedIOContext() { + if (valid_) { + io_destroy(ctx_); + valid_ = false; + } +} + +int execute_aligned_io(IOContext ctx, int fd, std::vector &reads, + uint64_t n_retries) { + if (reads.empty()) return 0; + + std::vector cbs(reads.size()); + std::vector cb_ptrs(reads.size()); + std::vector events(reads.size()); + + for (size_t i = 0; i < reads.size(); ++i) { + io_prep_pread(&cbs[i], fd, reads[i].buf, reads[i].len, + static_cast(reads[i].offset)); + cb_ptrs[i] = &cbs[i]; + } + + int n_submitted = 0; + int total = static_cast(reads.size()); + + for (uint64_t attempt = 0; attempt <= n_retries; ++attempt) { + while (n_submitted < total) { + int batch = std::min(total - n_submitted, kMaxEvents); + int ret = io_submit(ctx, batch, cb_ptrs.data() + n_submitted); + if (ret >= 0) { + n_submitted += ret; + } else if (ret == -EINTR) { + continue; + } else if (ret == -EAGAIN) { + continue; + } else { + LOG_WARN("io_submit failed: ret=%d, errno=%d; fallback to pread", ret, + errno); + return execute_io_pread(fd, reads); + } + } + if (n_submitted == total) break; + } + + if (n_submitted != total) { + LOG_WARN("io_submit incomplete after retries; fallback to pread"); + return execute_io_pread(fd, reads); + } + + int n_collected = 0; + while (n_collected < total) { + int ret = + io_getevents(ctx, total - n_collected, total - n_collected, events.data() + n_collected, nullptr); + if (ret > 0) { + n_collected += ret; + } else if (ret == -EINTR) { + continue; + } else { + LOG_WARN("io_getevents failed: ret=%d; fallback to pread", ret); + return execute_io_pread(fd, reads); + } + } + + for (int i = 0; i < total; ++i) { + if (static_cast(events[i].res) != reads[i].len) { + LOG_WARN( + "aio short read: expected=%llu, got=%lld; fallback to pread", + (unsigned long long)reads[i].len, (long long)events[i].res); + return execute_io_pread(fd, reads); + } + } + + return 0; +} + +} // namespace ailego +} // namespace zvec + +#else + +namespace zvec { +namespace ailego { + +ScopedIOContext::ScopedIOContext() { valid_ = false; } +ScopedIOContext::~ScopedIOContext() {} + +int execute_aligned_io(IOContext, int fd, std::vector &reads, + uint64_t) { + for (auto &r : reads) { + ssize_t got = ::pread(fd, r.buf, r.len, static_cast(r.offset)); + if (got != static_cast(r.len)) { + return -1; + } + } + return 0; +} + +} // namespace ailego +} // namespace zvec + +#endif diff --git a/src/ailego/io/aligned_async_io.h b/src/ailego/io/aligned_async_io.h new file mode 100644 index 000000000..ac486be14 --- /dev/null +++ b/src/ailego/io/aligned_async_io.h @@ -0,0 +1,44 @@ +#pragma once + +#include +#include + +#ifdef ZVEC_HAS_LIBAIO +#include +#endif + +namespace zvec { +namespace ailego { + +struct AlignedRead { + uint64_t offset; + uint64_t len; + void *buf; +}; + +#ifdef ZVEC_HAS_LIBAIO +using IOContext = io_context_t; +#else +using IOContext = uint32_t; +#endif + +class ScopedIOContext { + public: + ScopedIOContext(); + ~ScopedIOContext(); + ScopedIOContext(const ScopedIOContext &) = delete; + ScopedIOContext &operator=(const ScopedIOContext &) = delete; + + bool valid() const { return valid_; } + IOContext &get() { return ctx_; } + + private: + IOContext ctx_{}; + bool valid_{false}; +}; + +int execute_aligned_io(IOContext ctx, int fd, std::vector &reads, + uint64_t n_retries = 3); + +} // namespace ailego +} // namespace zvec diff --git a/src/core/algorithm/hnsw/hnsw_algorithm.cc b/src/core/algorithm/hnsw/hnsw_algorithm.cc index 8c6fcfe17..50d558bda 100644 --- a/src/core/algorithm/hnsw/hnsw_algorithm.cc +++ b/src/core/algorithm/hnsw/hnsw_algorithm.cc @@ -176,7 +176,10 @@ void HnswAlgorithm::add_neighbors(node_id_t id, level_t level, // // Two specialized inner loops, dispatched from search_neighbors(): // -// fast_search_neighbors: mmap/contiguous with direct vector pointers. +// fast_search_neighbors: level-0 unfiltered search for all storage +// modes (mmap/BufferPool/contiguous). Vector +// resolution delegated to entity via +// resolve_vectors()/release_vectors(). // Uses BlockHeap (AVX2) or LinearPool (scalar) // for visited tracking and top-k maintenance. // dual_heap_search_neighbors: CandidateHeap + TopkHeap + VisitFilter. @@ -184,16 +187,13 @@ void HnswAlgorithm::add_neighbors(node_id_t id, level_t level, // search, upper levels, and BufferPool fallback. // ============================================================================ -// mmap/contiguous variant: resolve vectors via get_vector_ptr and use -// LinearPool or BlockHeap for visited tracking + top-k maintenance. -// HeapType must expose reset/set_visited/check_visited/push_block/has_next/pop. template void fast_search_neighbors(const EntityType &entity, HeapType &pool, VisitFilter &visit, HnswDistCalculator &dc, uint32_t topk, uint32_t ef, node_id_t entry_point, dist_t entry_dist, uint32_t prefetch_lines, uint32_t prefetch_offset) { - const uint32_t max_deg = entity.max_degree(0); // level 0 only + const uint32_t max_deg = entity.max_degree(0); const uint32_t cap = std::max(topk, ef); pool.reset(static_cast(cap), static_cast(max_deg)); visit.clear(); @@ -219,41 +219,35 @@ void fast_search_neighbors(const EntityType &entity, HeapType &pool, neighbor_vecs.resize(buf_capacity); } - const uint32_t po = - std::min(static_cast(neighbors.size()), prefetch_offset); uint32_t unvisited_count = 0; - uint32_t i = 0; - - // Phase 1: scan first `po` neighbors with prefetch. - for (; i < po; ++i) { + for (uint32_t i = 0; i < neighbors.size(); ++i) { node_id_t node = neighbors[i]; if (visit.visited(node)) continue; visit.set_visited(node); - const void *vec_ptr = entity.get_vector_ptr(node); - const char *p = reinterpret_cast(vec_ptr); + neighbor_ids[unvisited_count++] = node; + } + + if (unvisited_count == 0) continue; + + if (ailego_unlikely(entity.resolve_vectors(neighbor_ids.data(), + unvisited_count, + neighbor_vecs.data()) != 0)) + break; + + const uint32_t po = std::min(prefetch_offset, unvisited_count); + for (uint32_t i = 0; i < po; ++i) { + const char *p = static_cast(neighbor_vecs[i]); for (uint32_t cl = 0; cl < prefetch_lines; ++cl) { ailego_prefetch(p + cl * 64); } - neighbor_ids[unvisited_count] = node; - neighbor_vecs[unvisited_count] = vec_ptr; - unvisited_count++; - } - - // Phase 2: scan remaining neighbors. - for (; i < neighbors.size(); ++i) { - node_id_t node = neighbors[i]; - if (visit.visited(node)) continue; - visit.set_visited(node); - neighbor_ids[unvisited_count] = node; - neighbor_vecs[unvisited_count] = entity.get_vector_ptr(node); - unvisited_count++; } - if (unvisited_count == 0) continue; dc.batch_dist(neighbor_vecs.data(), unvisited_count, dists.data()); pool.push_block(dists.data(), neighbor_ids.data(), static_cast(unvisited_count)); + + entity.release_vectors(); } } @@ -379,9 +373,7 @@ void dual_heap_search_neighbors(const EntityType &entity, level_t level, // search_neighbors: Dispatch to fast or dual-heap path. // // - add_node / filtered / upper levels → dual_heap_search_neighbors -// - level-0 unfiltered search: -// MmapMemoryBlock → fast_search_neighbors (BlockHeap/LinearPool) -// BufferPool → dual_heap_search_neighbors (fallback) +// - level-0 unfiltered search → fast_search_neighbors // ============================================================================ template void HnswAlgorithm::search_neighbors(level_t level, @@ -393,7 +385,6 @@ void HnswAlgorithm::search_neighbors(level_t level, HnswDistCalculator &dc = ctx->dist_calculator(); if (!use_pool || ctx->filter().is_valid() || level != 0) { - // Dual-heap path: add_node, filtered search, or upper-level scan. auto run_with_filter = [&](auto &&filter) { dual_heap_search_neighbors( entity, level, entry_point, dist, topk, ctx, dc, @@ -410,36 +401,24 @@ void HnswAlgorithm::search_neighbors(level_t level, run_with_filter(filter); } } else { - // Pool-based path for level-0 unfiltered search. - if constexpr (std::is_same_v) { - const uint32_t prefetch_lines = - ctx->pl() > 0 ? ctx->pl() : (entity.vector_size() + 63) / 64; - - // Fast path: direct pointer access via get_vector_ptr. - // BlockHeap (AVX2) or LinearPool (scalar) for top-k tracking. - const uint32_t topk_v = static_cast(ctx->topk()); - const uint32_t ef_v = ctx->ef(); - const bool avx2_ok = - zvec::ailego::internal::CpuFeatures::static_flags_.AVX2; - - auto &visit = ctx->visit_filter(); - - if (avx2_ok) { - auto &bpool = ctx->block_pool(); - fast_search_neighbors(entity, bpool, visit, dc, topk_v, ef_v, - *entry_point, *dist, prefetch_lines, ctx->po()); - copy_pool_to_topk(bpool, topk); - } else { - auto &lpool = ctx->pool(); - fast_search_neighbors(entity, lpool, visit, dc, topk_v, ef_v, - *entry_point, *dist, prefetch_lines, ctx->po()); - copy_pool_to_topk(lpool, topk); - } + const uint32_t prefetch_lines = + ctx->pl() > 0 ? ctx->pl() : (entity.vector_size() + 63) / 64; + const uint32_t topk_v = static_cast(ctx->topk()); + const uint32_t ef_v = ctx->ef(); + const bool avx2_ok = + zvec::ailego::internal::CpuFeatures::static_flags_.AVX2; + auto &visit = ctx->visit_filter(); + + if (avx2_ok) { + auto &bpool = ctx->block_pool(); + fast_search_neighbors(entity, bpool, visit, dc, topk_v, ef_v, + *entry_point, *dist, prefetch_lines, ctx->po()); + copy_pool_to_topk(bpool, topk); } else { - // BufferPool entities: fallback to dual-heap path. - auto filter = [](node_id_t) { return false; }; - dual_heap_search_neighbors( - entity, level, entry_point, dist, topk, ctx, dc, filter); + auto &lpool = ctx->pool(); + fast_search_neighbors(entity, lpool, visit, dc, topk_v, ef_v, + *entry_point, *dist, prefetch_lines, ctx->po()); + copy_pool_to_topk(lpool, topk); } } } diff --git a/src/core/algorithm/hnsw/hnsw_streamer_entity.cc b/src/core/algorithm/hnsw/hnsw_streamer_entity.cc index 50f15c3ff..2e8099b37 100644 --- a/src/core/algorithm/hnsw/hnsw_streamer_entity.cc +++ b/src/core/algorithm/hnsw/hnsw_streamer_entity.cc @@ -811,6 +811,40 @@ const HnswEntity::Pointer HnswMmapStreamerEntity::clone() const { return HnswEntity::Pointer(entity); } +const HnswEntity::Pointer HnswBufferPoolStreamerEntity::clone() const { + std::vector node_chunks; + node_chunks.reserve(node_chunks_.size()); + for (size_t i = 0UL; i < node_chunks_.size(); ++i) { + node_chunks.emplace_back(node_chunks_[i]->clone()); + if (ailego_unlikely(!node_chunks[i])) { + LOG_ERROR("HnswBufferPoolStreamerEntity get chunk failed in clone"); + return HnswEntity::Pointer(); + } + } + + std::vector upper_neighbor_chunks; + upper_neighbor_chunks.reserve(upper_neighbor_chunks_.size()); + for (size_t i = 0UL; i < upper_neighbor_chunks_.size(); ++i) { + upper_neighbor_chunks.emplace_back(upper_neighbor_chunks_[i]->clone()); + if (ailego_unlikely(!upper_neighbor_chunks[i])) { + LOG_ERROR("HnswBufferPoolStreamerEntity get chunk failed in clone"); + return HnswEntity::Pointer(); + } + } + + auto *entity = new (std::nothrow) HnswBufferPoolStreamerEntity( + stats_, header(), chunk_size_, node_index_mask_bits_, + upper_neighbor_mask_bits_, filter_same_key_, get_vector_enabled_, + upper_neighbor_index_, upper_neighbor_rw_mutex_, keys_map_lock_, + keys_map_, use_key_info_map_, std::move(node_chunks), + std::move(upper_neighbor_chunks), broker_, node_chunk_bases_, + upper_neighbor_chunk_bases_); + if (ailego_unlikely(!entity)) { + LOG_ERROR("HnswBufferPoolStreamerEntity new failed"); + } + return HnswEntity::Pointer(entity); +} + const HnswEntity::Pointer HnswContiguousStreamerEntity::clone() const { std::vector node_chunks; node_chunks.reserve(node_chunks_.size()); diff --git a/src/core/algorithm/hnsw/hnsw_streamer_entity.h b/src/core/algorithm/hnsw/hnsw_streamer_entity.h index 19f8ba161..667840698 100644 --- a/src/core/algorithm/hnsw/hnsw_streamer_entity.h +++ b/src/core/algorithm/hnsw/hnsw_streamer_entity.h @@ -14,7 +14,11 @@ #pragma once +#include +#include #include +#include +#include #include #include #include @@ -859,14 +863,21 @@ class HnswMmapStreamerEntity : public HnswStreamerEntity { return *reinterpret_cast(base + offset); } - //! Direct vector pointer access (no MemoryBlock wrapper). - //! For use in the merged search loop to avoid intermediate allocations. ailego_force_inline const void *get_vector_ptr(node_id_t id) const { uint32_t chunk_idx = id >> node_index_mask_bits_; uint32_t offset = (id & node_index_mask_) * node_size(); return get_node_chunk_base(chunk_idx) + offset; } + ailego_force_inline int resolve_vectors(const node_id_t *ids, uint32_t count, + const void **out) const { + for (uint32_t i = 0; i < count; ++i) + out[i] = get_vector_ptr(ids[i]); + return 0; + } + + ailego_force_inline void release_vectors() const {} + protected: //! Get cached base address for a node chunk, syncing if needed ailego_force_inline const char *get_node_chunk_base( @@ -912,7 +923,6 @@ class HnswMmapStreamerEntity : public HnswStreamerEntity { mutable std::vector upper_neighbor_chunk_bases_{}; }; -//! Typed entity subclass for buffer pool mode. class HnswBufferPoolStreamerEntity : public HnswStreamerEntity { public: using MemoryBlock = BufferPoolMemoryBlock; @@ -924,6 +934,8 @@ class HnswBufferPoolStreamerEntity : public HnswStreamerEntity { return HnswStorageMode::kBufferPool; } + const HnswEntity::Pointer clone() const override; + inline TypedNeighbors get_neighbors_typed(level_t level, node_id_t id) const { return HnswStreamerEntity::get_neighbors_typed(level, id); @@ -939,6 +951,120 @@ class HnswBufferPoolStreamerEntity : public HnswStreamerEntity { inline key_t get_key_typed(node_id_t id) const { return HnswStreamerEntity::get_key_typed(id); } + + int resolve_vectors(const node_id_t *ids, uint32_t count, + const void **out) const { + ensure_pinned_pages(); + if (ailego_unlikely(!pinned_pages_.bound())) return -1; + const size_t vec_sz = vector_size(); + const size_t pg_sz = ailego::kVectorPageSize; + cross_page_used_ = 0; + if (cross_page_arena_.size() < count * vec_sz) + cross_page_arena_.resize(count * vec_sz); + for (uint32_t i = 0; i < count; ++i) { + const size_t abs_off = get_vector_abs_offset(ids[i]); + const auto page_id = static_cast(abs_off / pg_sz); + const size_t intra = abs_off % pg_sz; + if (ailego_likely(intra + vec_sz <= pg_sz)) { + char *page = pinned_pages_.get_page(page_id); + if (ailego_unlikely(!page)) return -1; + out[i] = page + intra; + } else { + const size_t part1 = pg_sz - intra; + char *p1 = pinned_pages_.get_page(page_id); + char *p2 = pinned_pages_.get_page(page_id + 1); + if (ailego_unlikely(!p1 || !p2)) return -1; + char *scratch = cross_page_arena_.data() + cross_page_used_ * vec_sz; + ++cross_page_used_; + std::memcpy(scratch, p1 + intra, part1); + std::memcpy(scratch + part1, p2, vec_sz - part1); + out[i] = scratch; + } + } + return 0; + } + + void release_vectors() const { + pinned_pages_.release_all(); + } + + private: + struct PinnedPageSet { + static constexpr size_t kCapacity = 128; + static constexpr size_t kMask = kCapacity - 1; + static constexpr ailego::block_id_t kEmpty = + std::numeric_limits::max(); + + PinnedPageSet() { reset_table(); } + ~PinnedPageSet() { release_all(); } + PinnedPageSet(const PinnedPageSet &) = delete; + PinnedPageSet &operator=(const PinnedPageSet &) = delete; + + void bind(ailego::VecBufferPool *pool) { pool_ = pool; } + bool bound() const { return pool_ != nullptr; } + + char *get_page(ailego::block_id_t page_id) { + size_t slot = static_cast(page_id) & kMask; + for (;;) { + if (ids_[slot] == page_id) return bufs_[slot]; + if (ids_[slot] == kEmpty) { + char *buf = pool_->acquire_buffer(page_id, 50); + if (ailego_unlikely(!buf)) return nullptr; + ids_[slot] = page_id; + bufs_[slot] = buf; + ++count_; + return buf; + } + slot = (slot + 1) & kMask; + } + } + + void release_all() { + if (!pool_) return; + for (size_t i = 0; i < kCapacity; ++i) { + if (ids_[i] != kEmpty) { + pool_->page_table_.release_block(ids_[i]); + ids_[i] = kEmpty; + bufs_[i] = nullptr; + } + } + count_ = 0; + } + + private: + void reset_table() { + std::fill_n(ids_, kCapacity, kEmpty); + std::fill_n(bufs_, kCapacity, nullptr); + count_ = 0; + } + ailego::VecBufferPool *pool_{nullptr}; + ailego::block_id_t ids_[kCapacity]; + char *bufs_[kCapacity]; + size_t count_{0}; + }; + + ailego::VecBufferPool *vec_buffer_pool() const { + if (broker_ && broker_->storage()) { + return broker_->storage()->vec_buffer_pool(); + } + return nullptr; + } + + size_t get_vector_abs_offset(node_id_t id) const { + auto loc = get_vector_chunk_loc(id); + return node_chunks_[loc.first]->abs_data_offset() + loc.second; + } + + void ensure_pinned_pages() const { + if (!pinned_pages_.bound()) { + auto *pool = vec_buffer_pool(); + if (pool) pinned_pages_.bind(pool); + } + } + + mutable PinnedPageSet pinned_pages_; + mutable std::vector cross_page_arena_; + mutable uint32_t cross_page_used_{0}; }; //! Typed entity subclass for contiguous memory mode. @@ -1048,18 +1174,24 @@ class HnswContiguousStreamerEntity : public HnswMmapStreamerEntity { return HnswMmapStreamerEntity::get_key_typed(id); } - //! Direct vector pointer from flat vector array (stride = vector_size). - //! For use in the merged search loop to avoid intermediate allocations. ailego_force_inline const void *get_vector_ptr(node_id_t id) const { if (ailego_likely(vector_base_ != nullptr)) { return vector_base_ + static_cast(id) * vector_size(); } - // Fallback to mmap chunk-based access uint32_t chunk_idx = id >> node_index_mask_bits_; uint32_t offset = (id & node_index_mask_) * node_size(); return get_node_chunk_base(chunk_idx) + offset; } + ailego_force_inline int resolve_vectors(const node_id_t *ids, uint32_t count, + const void **out) const { + for (uint32_t i = 0; i < count; ++i) + out[i] = get_vector_ptr(ids[i]); + return 0; + } + + ailego_force_inline void release_vectors() const {} + protected: //! Custom deleter for contiguous memory (munmap / _aligned_free / free) //! Used by shared_ptr to properly release mmap'd memory. diff --git a/src/core/utility/buffer_storage.cc b/src/core/utility/buffer_storage.cc index 3db6c58d9..50b5bde8f 100644 --- a/src/core/utility/buffer_storage.cc +++ b/src/core/utility/buffer_storage.cc @@ -449,6 +449,12 @@ class BufferStorage : public IndexStorage { return shared_from_this(); } + size_t abs_data_offset(void) const override { + return segment_info_->segment_header_start_offset + + segment_info_->segment_header->content_offset + + segment_info_->segment.meta()->data_index; + } + protected: friend BufferStorage; // Pointer into BufferStorage::segments_ (unordered_map mapped value). @@ -481,6 +487,8 @@ class BufferStorage : public IndexStorage { if (val != 0) { segment_meta_capacity_ = val; } + params.get(BUFFER_STORAGE_ENABLE_DIRECT_IO, &enable_direct_io_); + enable_direct_io_ = true; return 0; } @@ -509,7 +517,8 @@ class BufferStorage : public IndexStorage { // Open in writable mode when the caller expects to modify the index // (create_if_missing=true implies write intent, same as MMapFileStorage). buffer_pool_ = std::make_shared( - path, /*writable=*/create_if_missing); + path, /*writable=*/create_if_missing, + /*enable_direct_io=*/enable_direct_io_); buffer_pool_handle_ = std::make_shared( buffer_pool_->get_handle()); int ret = ParseToMapping(); @@ -804,6 +813,10 @@ class BufferStorage : public IndexStorage { return chain_headers_.front()->magic; } + ailego::VecBufferPool *vec_buffer_pool(void) const override { + return buffer_pool_.get(); + } + protected: //! Initialize index version segment (writes content into an IndexMapping). //! Only intended to be called from init_index() while `mapping` is still @@ -1530,6 +1543,10 @@ class BufferStorage : public IndexStorage { // init_index(). uint32_t segment_meta_capacity_{4096u}; + // When true, the page-data fd is opened with O_DIRECT (metadata fd stays + // buffered). Defaults to false: identical behaviour to the legacy path. + bool enable_direct_io_{false}; + // Per-header-chain file offsets used by flush_index() and append_segment(). struct MetaChain { uint64_t header_start_offset; diff --git a/src/core/utility/utility_params.h b/src/core/utility/utility_params.h index c57e6e980..1b8ba2cef 100644 --- a/src/core/utility/utility_params.h +++ b/src/core/utility/utility_params.h @@ -72,6 +72,10 @@ static const std::string MMAPFILE_STORAGE_FORCE_FLUSH = static const std::string MMAPFILE_STORAGE_SEGMENT_META_CAPACITY = "proxima.mmap_file.storage.segment_meta_capacity"; +//! BufferStorage +static const std::string BUFFER_STORAGE_ENABLE_DIRECT_IO = + "proxima.buffer.storage.enable_direct_io"; + //! MipsConverter static const std::string MIPS_CONVERTER_M_VALUE = "proxima.mips.converter.m_value"; diff --git a/src/include/zvec/ailego/buffer/vector_page_table.h b/src/include/zvec/ailego/buffer/vector_page_table.h index 02d19bbc7..755c53d24 100644 --- a/src/include/zvec/ailego/buffer/vector_page_table.h +++ b/src/include/zvec/ailego/buffer/vector_page_table.h @@ -202,7 +202,8 @@ class VecBufferPool { static constexpr size_t kMutexBucketCount = 64UL * 1024UL; - VecBufferPool(const std::string &filename, bool writable = false); + VecBufferPool(const std::string &filename, bool writable = false, + bool enable_direct_io = false); ~VecBufferPool() { // Flush any remaining dirty blocks before tearing down memory/fd so that // writes are not silently lost. Safe to call even in read-only mode. @@ -213,8 +214,10 @@ class VecBufferPool { } #if defined(_MSC_VER) _close(fd_); + _close(meta_fd_); #else close(fd_); + close(meta_fd_); #endif } @@ -253,11 +256,17 @@ class VecBufferPool { return file_size_; } + void batch_prefetch(const block_id_t *page_ids, size_t count); + + int fd() const { return fd_; } + private: - int fd_; + int fd_; // page-data channel: may carry O_DIRECT + int meta_fd_; // metadata channel: always buffered IO size_t file_size_; std::string file_name_; bool writable_{false}; + bool direct_io_enabled_{false}; // whether O_DIRECT actually took effect public: VectorPageTable page_table_; @@ -293,6 +302,10 @@ class VecBufferPoolHandle { void acquire_one(block_id_t block_id); + void batch_prefetch(const block_id_t *page_ids, size_t count) { + pool_.batch_prefetch(page_ids, count); + } + private: VecBufferPool &pool_; }; diff --git a/src/include/zvec/core/framework/index_storage.h b/src/include/zvec/core/framework/index_storage.h index 049c79917..ca4e24dbb 100644 --- a/src/include/zvec/core/framework/index_storage.h +++ b/src/include/zvec/core/framework/index_storage.h @@ -332,6 +332,10 @@ class IndexStorage : public IndexModule { virtual const uint8_t *base_data(void) const { return nullptr; } + + virtual size_t abs_data_offset(void) const { + return 0; + } }; //! Destructor @@ -399,6 +403,10 @@ class IndexStorage : public IndexModule { virtual std::string file_path(void) const { return ""; } + + virtual ailego::VecBufferPool *vec_buffer_pool(void) const { + return nullptr; + } }; } // namespace core