Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/ailego/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ set(EXTRA_LIBS ${CMAKE_THREAD_LIBS_INIT} ${CMAKE_DL_LIBS})

if(UNIX AND NOT APPLE)
list(APPEND EXTRA_LIBS ${LIB_RT})
find_library(LIB_AIO NAMES aio)
if(LIB_AIO)
list(APPEND EXTRA_LIBS ${LIB_AIO})
endif()
endif()

if(NOT ANDROID AND AUTO_DETECT_ARCH)
Expand Down Expand Up @@ -123,3 +127,8 @@ cc_library(
LIBS ${EXTRA_LIBS}
VERSION "${GIT_SRCS_VER}"
)

if(LIB_AIO)
target_compile_definitions(zvec_ailego PUBLIC ZVEC_HAS_LIBAIO)
message(STATUS "Found libaio: ${LIB_AIO} (HNSW async prefetch enabled)")
endif()
143 changes: 130 additions & 13 deletions src/ailego/buffer/vector_page_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@
// limitations under the License.

#include <algorithm>
#include <cerrno>
#include <chrono>
#include <cstring>
#include <thread>
#include <ailego/utility/memory_helper.h>
#include <zvec/ailego/buffer/vector_page_table.h>
#include <zvec/core/framework/index_logger.h>
#include "../io/aligned_async_io.h"
#include <zvec/ailego/logger/logger.h>

#if defined(_MSC_VER)
Expand Down Expand Up @@ -272,27 +275,71 @@ char *VectorPageTable::set_block_acquired(block_id_t block_id, char *buffer,
}
}

VecBufferPool::VecBufferPool(const std::string &filename, bool writable) {
VecBufferPool::VecBufferPool(const std::string &filename, bool writable,
bool enable_direct_io) {
file_name_ = filename;
writable_ = writable;
#if defined(_MSC_VER)
int flags = writable_ ? (O_RDWR | _O_BINARY) : (O_RDONLY | _O_BINARY);
fd_ = _open(filename.c_str(), flags, 0644);
meta_fd_ = _open(filename.c_str(), flags, 0644);
(void)enable_direct_io; // O_DIRECT not supported on this path
#else
int flags = writable_ ? O_RDWR : O_RDONLY;
fd_ = ::open(filename.c_str(), flags, 0644);
int base_flags = writable_ ? O_RDWR : O_RDONLY;
// Metadata channel: always buffered IO. Serves the unaligned
// header/footer/segment_meta reads & writes and benefits from page cache.
meta_fd_ = ::open(filename.c_str(), base_flags, 0644);
// Page-data channel: optionally O_DIRECT; fall back to buffered open when
// the filesystem (tmpfs/overlayfs/...) rejects O_DIRECT.
int data_flags = base_flags;
#ifdef O_DIRECT
if (enable_direct_io) {
data_flags |= O_DIRECT;
}
#endif
fd_ = ::open(filename.c_str(), data_flags, 0644);
#ifdef O_DIRECT
if (fd_ < 0 && (data_flags & O_DIRECT)) {
LOG_WARN(
"VecBufferPool: open with O_DIRECT failed for file[%s] (errno=%d), "
"falling back to buffered IO",
filename.c_str(), errno);
fd_ = ::open(filename.c_str(), base_flags, 0644);
direct_io_enabled_ = false;
} else {
direct_io_enabled_ = (data_flags & O_DIRECT) != 0;
}
#else
(void)enable_direct_io;
#endif
#endif
if (fd_ < 0 || meta_fd_ < 0) {
if (fd_ >= 0) {
#if defined(_MSC_VER)
_close(fd_);
#else
::close(fd_);
#endif
if (fd_ < 0) {
}
if (meta_fd_ >= 0) {
#if defined(_MSC_VER)
_close(meta_fd_);
#else
::close(meta_fd_);
#endif
}
throw std::runtime_error("Failed to open file: " + filename);
}
#if defined(_MSC_VER)
struct _stat64 st;
if (_fstat64(fd_, &st) < 0) {
_close(fd_);
_close(meta_fd_);
#else
struct stat st;
if (fstat(fd_, &st) < 0) {
::close(fd_);
::close(meta_fd_);
#endif
throw std::runtime_error("Failed to stat file: " + filename);
}
Expand Down Expand Up @@ -375,24 +422,32 @@ char *VecBufferPool::acquire_buffer(block_id_t page_id, int retry) {
}

size_t page_offset = page_id * kVectorPageSize;
size_t expected_bytes = std::min(kVectorPageSize, file_size_ - page_offset);
if (expected_bytes < kVectorPageSize) {
std::memset(buffer + expected_bytes, 0, kVectorPageSize - expected_bytes);
}
ssize_t read_bytes = zvec_pread(fd_, buffer, expected_bytes, page_offset);
if (read_bytes != static_cast<ssize_t>(expected_bytes)) {
// O_DIRECT requires the IO length to be a multiple of the device block
// size. The backing file size is always page-aligned (IndexMapping +
// append_segment guarantee this), so reading a full page never reads past
// EOF; the tail padding is the file's own zero region. In direct mode we
// MUST read the whole page; the buffered path keeps the legacy short-read
// + zero-pad behaviour.
size_t read_len = direct_io_enabled_
? kVectorPageSize
: std::min(kVectorPageSize, file_size_ - page_offset);
if (read_len < kVectorPageSize) {
std::memset(buffer + read_len, 0, kVectorPageSize - read_len);
}
ssize_t read_bytes = zvec_pread(fd_, buffer, read_len, page_offset);
if (read_bytes != static_cast<ssize_t>(read_len)) {
LOG_ERROR(
"Buffer pool failed to read file at offset: file[%s], page_id[%zu], "
"offset[%zu], expected[%zu], got[%zd]",
file_name_.c_str(), page_id, page_offset, expected_bytes, read_bytes);
file_name_.c_str(), page_id, page_offset, read_len, read_bytes);
MemoryLimitPool::get_instance().release_buffer(buffer, kVectorPageSize);
return nullptr;
}
return page_table_.set_block_acquired(page_id, buffer, page_offset);
}

int VecBufferPool::get_meta(size_t offset, size_t length, char *buffer) {
ssize_t read_bytes = zvec_pread(fd_, buffer, length, offset);
ssize_t read_bytes = zvec_pread(meta_fd_, buffer, length, offset);
if (read_bytes != static_cast<ssize_t>(length)) {
LOG_ERROR(
"Buffer pool failed to read file at offset: file[%s], offset[%zu], "
Expand Down Expand Up @@ -446,7 +501,7 @@ int VecBufferPool::write_meta(size_t offset, size_t length,
file_name_.c_str());
return -1;
}
ssize_t w = zvec_pwrite(fd_, buffer, length, offset);
ssize_t w = zvec_pwrite(meta_fd_, buffer, length, offset);
if (w != static_cast<ssize_t>(length)) {
LOG_ERROR(
"Buffer pool failed to write meta: file[%s], offset[%zu], "
Expand Down Expand Up @@ -495,6 +550,10 @@ bool VecBufferPool::extend_file(size_t new_size) {
if (new_size <= file_size_) {
return true;
}
// The backing file must stay page-aligned so that O_DIRECT full-page reads
// never read past EOF. All current callers pass page-aligned targets.
assert(new_size % kVectorPageSize == 0 &&
"extend_file target must be page-aligned for O_DIRECT correctness");
// Pre-validate against the page table's static capacity BEFORE mutating
// any on-disk state. Otherwise a successful ftruncate followed by a
// failed page_table_.extend() would leave the file size and the page
Expand Down Expand Up @@ -617,5 +676,63 @@ void VecBufferPoolHandle::acquire_one(block_id_t block_id) {
pool_.page_table_.acquire_block(block_id);
}

void VecBufferPool::batch_prefetch(const block_id_t *page_ids, size_t count) {
#ifdef ZVEC_HAS_LIBAIO
if (count == 0) return;

static thread_local ScopedIOContext tl_io_ctx;
if (!tl_io_ctx.valid()) return;

std::vector<AlignedRead> reads;
std::vector<std::pair<block_id_t, char *>> pending;
reads.reserve(count);
pending.reserve(count);

for (size_t i = 0; i < count; ++i) {
block_id_t pid = page_ids[i];
if (pid >= page_table_.entry_num()) continue;
char *existing = page_table_.acquire_block(pid);
if (existing) {
page_table_.release_block(pid);
continue;
}
char *buf = nullptr;
bool found =
MemoryLimitPool::get_instance().try_acquire_buffer(kVectorPageSize, buf);
if (!found || !buf) continue;

AlignedRead rd;
rd.offset = static_cast<uint64_t>(pid) * kVectorPageSize;
rd.len = kVectorPageSize;
rd.buf = buf;
reads.push_back(rd);
pending.emplace_back(pid, buf);
}

if (reads.empty()) return;

int rc = execute_aligned_io(tl_io_ctx.get(), fd_, reads);
if (rc != 0) {
for (auto &p : pending) {
MemoryLimitPool::get_instance().release_buffer(p.second, kVectorPageSize);
}
return;
}

for (auto &p : pending) {
block_id_t pid = p.first;
char *buf = p.second;
size_t page_offset = static_cast<size_t>(pid) * kVectorPageSize;
std::lock_guard<std::mutex> lock(
block_mutexes_[pid % VecBufferPool::kMutexBucketCount]);
page_table_.set_block_acquired(pid, buf, page_offset);
page_table_.release_block(pid);
}
#else
(void)page_ids;
(void)count;
#endif
}

} // namespace ailego
} // namespace zvec
140 changes: 140 additions & 0 deletions src/ailego/io/aligned_async_io.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
#include "aligned_async_io.h"

#include <cerrno>
#include <zvec/core/framework/index_logger.h>

#if defined(__linux__) && defined(ZVEC_HAS_LIBAIO)
#include <unistd.h>

namespace zvec {
namespace ailego {

static int execute_io_pread(int fd, std::vector<AlignedRead> &reads) {
for (auto &r : reads) {
ssize_t got = ::pread(fd, r.buf, r.len, static_cast<off_t>(r.offset));
if (got != static_cast<ssize_t>(r.len)) {
LOG_ERROR(
"pread fallback failed: fd=%d, offset=%llu, len=%llu, got=%zd, "
"errno=%d",
fd, (unsigned long long)r.offset, (unsigned long long)r.len, got,
errno);
return -1;
}
}
return 0;
}

static constexpr int kMaxEvents = 128;

ScopedIOContext::ScopedIOContext() {
ctx_ = 0;
int ret = io_setup(kMaxEvents, &ctx_);
if (ret == 0) {
valid_ = true;
} else {
LOG_WARN("io_setup failed (ret=%d, errno=%d); prefetch disabled on thread",
ret, errno);
valid_ = false;
}
}

ScopedIOContext::~ScopedIOContext() {
if (valid_) {
io_destroy(ctx_);
valid_ = false;
}
}

int execute_aligned_io(IOContext ctx, int fd, std::vector<AlignedRead> &reads,
uint64_t n_retries) {
if (reads.empty()) return 0;

std::vector<iocb> cbs(reads.size());
std::vector<iocb *> cb_ptrs(reads.size());
std::vector<io_event> events(reads.size());

for (size_t i = 0; i < reads.size(); ++i) {
io_prep_pread(&cbs[i], fd, reads[i].buf, reads[i].len,
static_cast<long long>(reads[i].offset));
cb_ptrs[i] = &cbs[i];
}

int n_submitted = 0;
int total = static_cast<int>(reads.size());

for (uint64_t attempt = 0; attempt <= n_retries; ++attempt) {
while (n_submitted < total) {
int batch = std::min(total - n_submitted, kMaxEvents);
int ret = io_submit(ctx, batch, cb_ptrs.data() + n_submitted);
if (ret >= 0) {
n_submitted += ret;
} else if (ret == -EINTR) {
continue;
} else if (ret == -EAGAIN) {
continue;
} else {
LOG_WARN("io_submit failed: ret=%d, errno=%d; fallback to pread", ret,
errno);
return execute_io_pread(fd, reads);
}
}
if (n_submitted == total) break;
}

if (n_submitted != total) {
LOG_WARN("io_submit incomplete after retries; fallback to pread");
return execute_io_pread(fd, reads);
}

int n_collected = 0;
while (n_collected < total) {
int ret =
io_getevents(ctx, total - n_collected, total - n_collected, events.data() + n_collected, nullptr);
if (ret > 0) {
n_collected += ret;
} else if (ret == -EINTR) {
continue;
} else {
LOG_WARN("io_getevents failed: ret=%d; fallback to pread", ret);
return execute_io_pread(fd, reads);
}
}

for (int i = 0; i < total; ++i) {
if (static_cast<uint64_t>(events[i].res) != reads[i].len) {
LOG_WARN(
"aio short read: expected=%llu, got=%lld; fallback to pread",
(unsigned long long)reads[i].len, (long long)events[i].res);
return execute_io_pread(fd, reads);
}
}

return 0;
}

} // namespace ailego
} // namespace zvec

#else

namespace zvec {
namespace ailego {

ScopedIOContext::ScopedIOContext() { valid_ = false; }
ScopedIOContext::~ScopedIOContext() {}

int execute_aligned_io(IOContext, int fd, std::vector<AlignedRead> &reads,
uint64_t) {
for (auto &r : reads) {
ssize_t got = ::pread(fd, r.buf, r.len, static_cast<off_t>(r.offset));
if (got != static_cast<ssize_t>(r.len)) {
return -1;
}
}
return 0;
}

} // namespace ailego
} // namespace zvec

#endif
Loading