From aa31e3ec287e951cce2c06e33825d76217bfe8c6 Mon Sep 17 00:00:00 2001 From: Proteet Paul Date: Thu, 5 Feb 2026 05:24:52 -0600 Subject: [PATCH 01/11] Initial commit with io-uring --- src/CMakeLists.txt | 6 + src/async_stages.h | 482 +++++++++++++++++++++++++++++++++++++++++++ src/config.h | 3 + src/io_uring_utils.h | 128 ++++++++++++ src/main.cpp | 7 + src/sorted_run.h | 6 - src/sorter.h | 123 +++++++++-- 7 files changed, 737 insertions(+), 18 deletions(-) create mode 100644 src/async_stages.h create mode 100644 src/io_uring_utils.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 656599c..ac7be30 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -20,12 +20,18 @@ endif() find_package(OpenMP REQUIRED) +find_library(LIBURING_LIBRARIES uring) +find_path(LIBURING_INCLUDE_DIRS liburing.h) + # Add executable add_executable(sorter main.cpp) # target_compile_options(sorter PRIVATE -fopenmp) target_link_libraries(sorter PUBLIC OpenMP::OpenMP_CXX) +target_include_directories(sorter PRIVATE ${LIBURING_INCLUDE_DIRS}) +target_link_libraries(sorter PRIVATE ${LIBURING_LIBRARIES}) + # Enable additional optimizations # target_compile_options(sorter PRIVATE -march=native -mtune=native) target_compile_options(sorter PRIVATE -mavx512f -mavx512vl) diff --git a/src/async_stages.h b/src/async_stages.h new file mode 100644 index 0000000..4d7e3b3 --- /dev/null +++ b/src/async_stages.h @@ -0,0 +1,482 @@ +#pragma once + +/** + * Task 1: Fuse Phase 1 (read input) and Phase 2 (extract keys) using io_uring. + * + * Reads a run in small chunks via io_uring. When each chunk completes, keys + * are extracted into (key, index) pairs and the chunk is copied into the run + * buffer. Prefetching is done by submitting the next read as soon as a slot + * is free (queue depth controls in-flight reads). + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "config.h" +#include "merge.h" +#include "io_uring_utils.h" +#include "key_value_pair.h" +#include "spdlog/spdlog.h" + + +constexpr uint64_t DEFAULT_READ_CHUNK_BYTES = 512 * 1024; // 512 KB + +constexpr uint32_t PREFETCH_DEPTH = 4; + +inline uint64_t align_up_block(uint64_t size) { + const uint64_t mask = io_uring_utils::BLOCK_ALIGN - 1; + return (size + mask) & ~mask; +} + +/** + * Extract keys from a chunk of raw record bytes into key_index_pairs. + * Record layout: key (8 bytes) then value. Keys are byte-swapped to match + * generate_key_index_pairs. + */ +template +void extract_keys_from_chunk( + const void* input_buf, + uint64_t chunk_id, + uint64_t num_records_in_chunk, + KeyValuePair* key_index_pairs +) { + constexpr uint32_t ELEM_SIZE = sizeof(RecordType); + uint64_t offset = chunk_id * num_records_in_chunk; + static_assert(RecordType::KEY_LENGTH == 8, "Size of key should be 8 bytes"); + for (uint64_t i = 0; i < num_records_in_chunk; ++i) { + uint64_t idx = offset + i; + const uint8_t* ptr = (uint8_t*)input_buf + chunk_id * num_records_in_chunk * ELEM_SIZE; + key_index_pairs[idx].key = __builtin_bswap64(*((uint64_t*)ptr)); + key_index_pairs[idx].set_value(&idx); + } +} + +inline std::pair process_cqe(io_uring_cqe *cqe) { + int res = io_uring_utils::UringRing::cqe_result(cqe); + uint64_t user_data = io_uring_utils::UringRing::cqe_user_data(cqe); + return std::make_pair<>(user_data, res); +} + +// TODO(): Rewrite this as a class +/** + * Read one run from fd using io_uring in small chunks, and extract keys into + * key_index_pairs. Run data is written into run_buffer (must be at least + * run_size_bytes). key_index_pairs is resized and filled for the run. + * + * Uses a single io_uring ring with READ_QUEUE_DEPTH in-flight reads. + * read_chunk_bytes: size of each read chunk (will be aligned to BLOCK_ALIGN). + */ +template +void read_run_and_extract_keys(int fd, uint64_t run_id, uint64_t run_size_bytes, + uint8_t* run_buffer, + KeyValuePair* key_index_pairs +) { + using KeyIndexPair = KeyValuePair; + constexpr uint32_t ELEM_SIZE = sizeof(RecordType); + + assert(Config::BLOCK_SIZE_ALIGN % ELEM_SIZE == 0); + assert(run_size_bytes % ELEM_SIZE == 0); + + uint64_t num_records = run_size_bytes / ELEM_SIZE; + + uint64_t read_chunk_bytes = DEFAULT_READ_CHUNK_BYTES; + + uint64_t file_offset = run_id * run_size_bytes; + uint64_t num_chunks = (run_size_bytes + read_chunk_bytes - 1) / read_chunk_bytes; + + io_uring_utils::UringRing ring(PREFETCH_DEPTH); + + std::queue free_slots; + for (uint32_t i = 0; i < PREFETCH_DEPTH; ++i) { + free_slots.push(i); + } + + auto start = std::chrono::high_resolution_clock::now(); + uint64_t chunk_id = 0; + uint64_t completed = 0; + + while (completed < num_chunks) { + while (chunk_id < num_chunks && !free_slots.empty()) { + uint32_t slot_id = free_slots.front(); + free_slots.pop(); + + uint64_t chunk_offset = chunk_id * read_chunk_bytes; + void *buf = run_buffer + chunk_offset; + uint64_t chunk_bytes = std::min(read_chunk_bytes, run_size_bytes - chunk_offset); + uint64_t read_size = align_up_block(chunk_bytes); + + bool ok = ring.prepare_read(fd, buf, static_cast(read_size), + file_offset + chunk_offset, + (chunk_id << 32) | slot_id); + if (!ok) { + spdlog::error("submit_read failed for chunk {}", chunk_id); + free_slots.push(slot_id); + break; + } + chunk_id++; + } + ring.submit_and_wait(1); + + struct io_uring_cqe* cqe = nullptr; + if (!ring.wait_cqe(&cqe)) { + spdlog::error("wait_cqe failed"); + break; + } + + auto p = process_cqe(cqe); + uint32_t chunk_id = p.first >> 32; + uint32_t slot_id = p.first & 0xffff; + if (p.second < 0) { + spdlog::error("read chunk {} failed: {}", chunk_id, p.second); + free_slots.push(slot_id); + ring.mark_cqe_seen(cqe); + completed++; + continue; + } + + uint64_t done_chunk_offset = chunk_id * read_chunk_bytes; + uint64_t done_chunk_bytes = std::min(read_chunk_bytes, run_size_bytes - done_chunk_offset); + uint64_t num_records_chunk = done_chunk_bytes / ELEM_SIZE; + + extract_keys_from_chunk( + run_buffer, chunk_id, num_records_chunk, key_index_pairs + ); + + free_slots.push(slot_id); + ring.mark_cqe_seen(cqe); + completed++; + } + + auto end = std::chrono::high_resolution_clock::now(); + auto time_elapsed = std::chrono::duration_cast(end - start).count() / 1000.0f; + + spdlog::debug("Run {}: read+extract {} ms, {} chunks", run_id, time_elapsed, num_chunks); +} + +/** +* Accumulates values into in-memory buffers and writes them out in batches. This ensures overlap between cpu and io, while keeping the memory footprint low. +*/ +template +class ValueWriterPostSort { + using KeyIndexPair = KeyValuePair; + + int fd; // Intermediate file containing values + + int thread_idx; // Each thread writes to a non-overlapping portion of the file + + uint64_t values_per_chunk; // Number of values per thread + + uint8_t *input_buffer; // Buffer containing key-value pairs + + int run_idx; // This writer is for the i'th sorted run + + std::vector write_bufs; // Set of pre-allocated buffers used for writing out + + std::unique_ptr ring; + + KeyIndexPair* key_index_pairs; // Sorted key-index pairs + +public: + static constexpr uint64_t WRITE_IO_BYTES = RecordType::VALUE_LENGTH * io_uring_utils::BLOCK_ALIGN; + + static constexpr uint32_t NUM_SLOTS = PREFETCH_DEPTH * 4; + + static constexpr uint32_t BATCH_SIZE = 1; + + explicit ValueWriterPostSort(int fd, int thread_idx, uint64_t values_per_chunk, + uint8_t* input_buffer, int run_idx, + KeyIndexPair* key_index_pairs + ): thread_idx(thread_idx), values_per_chunk(values_per_chunk), + run_idx(run_idx), input_buffer(input_buffer), key_index_pairs(key_index_pairs) { + fd = dup(fd); + ring = std::make_unique(PREFETCH_DEPTH); + write_bufs.resize(NUM_SLOTS); + for (uint32_t i=0; i= 0) { + close(fd); + } + for (uint32_t i = 0; i < write_bufs.size(); i++) { + free(write_bufs[i]); + } + } + + void write_values_to_buf(uint32_t buf_slot) { + uint64_t num_values = WRITE_IO_BYTES / RecordType::VALUE_LENGTH; + uint8_t *buf = (uint8_t*)write_bufs[buf_slot]; + for (uint64_t i=0; i slots; + for (uint32_t i=0; iprepare_write(fd, buf, WRITE_IO_BYTES, file_offset, user_data); + to_submit++; + next_write++; + } + + if (slots.empty() || to_submit >= BATCH_SIZE) { + // Wait for completions only if no more cpu work can be done + ring->submit_and_wait(slots.empty()); + to_submit = 0; + + if (!slots.empty()) { + continue; + } + + struct io_uring_cqe* cqe = nullptr; + if (!ring->wait_cqe(&cqe)) { + spdlog::error("wait_cqe failed"); + break; + } + auto p = process_cqe(cqe); + uint32_t slot_id = (uint32_t) (p.first & 0xffff); + slots.push(slot_id); + int result = p.second; + } + } + } +}; + +class AsyncValueReader { + enum BufState { + Empty, + IoCompleted, + WaitingForIO, + }; + + int fd; + uint64_t file_offset; // Offset within the file + uint64_t end_file_offset; + int reader_id; + + // TODO(): Maybe the number of buffers can be generalized?? + void *ptr[2]; + BufState states[2]; + uint64_t chunk_offset; // Offset within the in-memory buffer that is being used for reads + uint64_t read_chunk_size; + uint64_t value_length_bytes; // Size of individual values + uint64_t cur_buf_idx; + +public: + inline bool waiting_for_io() { + return states[cur_buf_idx] == BufState::Empty; + } + + AsyncValueReader(int fd, uint64_t start_offset, uint64_t value_length, uint64_t read_chunk_size, int reader_id): + fd(fd), file_offset(start_offset), chunk_offset(0ll), value_length_bytes(value_length), reader_id(reader_id) { + for (int i=0; i<2; i++) { + posix_memalign(&ptr[i], 4096, read_chunk_size); + states[i] = BufState::Empty; + } + cur_buf_idx = 0; + } + + AsyncValueReader(const AsyncValueReader&) = delete; + AsyncValueReader& operator=(const AsyncValueReader&) = delete; + + ~AsyncValueReader() { + close(fd); + free(ptr[0]); + free(ptr[1]); + } + + inline void *get_next_value() { + if (chunk_offset >= read_chunk_size) { + states[cur_buf_idx] = BufState::Empty; + cur_buf_idx = 1 ^ cur_buf_idx; + if (states[cur_buf_idx] != BufState::IoCompleted) { + return nullptr; + } + chunk_offset = 0ll; + } + void *res = (uint8_t*) ptr[cur_buf_idx] + chunk_offset; + chunk_offset += value_length_bytes; + return res; + } + + inline void process_io_completion(uint64_t user_data) { + int buf_idx = (int)(user_data & 1); + states[buf_idx] = IoCompleted; + file_offset += read_chunk_size; + } + + inline std::optional get_next_io() { + uint64_t next_buf_idx = cur_buf_idx ^ 1; + if (states[next_buf_idx] == IoCompleted) { + return std::nullopt; + } + uint64_t user_data = (cur_buf_idx << 16) | next_buf_idx; + io_uring_utils::ReadTask task { + ptr[next_buf_idx], read_chunk_size, + fd, file_offset, user_data + }; + return task; + } +}; + + +template +class ValueWriterPostMerge { + using KeyIndexPair = KeyValuePair; + + static constexpr uint64_t WRITE_IO_BYTES = 120 * 1024; + + static constexpr uint64_t READ_IO_CHUNK = 120 * 1024; + + int out_fd; // Output file containing key-value pairs + + MergeTask *task; + + std::vector write_bufs; // Set of pre-allocated buffers used for writing out + + std::queue slots; + + std::unique_ptr ring; + + std::vector> readers; + +public: + static constexpr uint32_t NUM_SLOTS = PREFETCH_DEPTH * 4; + + static constexpr uint32_t BATCH_SIZE = 1; + + explicit ValueWriterPostMerge(MergeTask *task, int out_fd, + std::vector &in_fds, std::vector &start_ptrs) + : task(task) { + assert(in_fds.size() == start_ptrs.size()); + assert(WRITE_IO_BYTES % RecordType::VALUE_LENGTH == 0); + ring = std::make_unique(PREFETCH_DEPTH); + this->out_fd = dup(out_fd); + + for (int i=0; istart_ptrs[i]) * RecordType::VALUE_LENGTH; + auto reader = std::make_unique(fd, file_offset, RecordType::VALUE_LENGTH, READ_IO_CHUNK, i); + readers.push_back(std::move(reader)); + } + + write_bufs.resize(NUM_SLOTS); + for (uint32_t i=0; iwait_cqe(&cqe); + ring->peek_cqe(&cqe); + if ((cqe->user_data >> 32) < NUM_SLOTS) { + uint32_t slot = (cqe->user_data >> 32); + slots.push(slot); + } else { + int reader_idx = cqe->user_data & 0xffff; + readers[reader_idx]->process_io_completion(cqe->user_data); + auto task = readers[reader_idx]->get_next_io(); + if (task.has_value()) { + task.value().user_data |= reader_idx; + ring->prepare_read(task.value()); + } + } + ring->mark_cqe_seen(cqe); + } + + void run() { + for (uint32_t i=0; ioutput; + + int i = 0; + for (auto &reader: readers) { + auto task = reader->get_next_io(); + if (task.has_value()) { + task.value().user_data |= i; + ring->prepare_read(task.value()); + } + i++; + } + ring->submit_and_wait(0); + uint64_t out_file_offset = 0ll; + + while (records_emitted < task->total_records_sorted) { + while (slots.empty()) { + poll_completions(); + } + uint32_t slot = slots.front(); + slots.pop(); + uint64_t records_to_write = records_emitted + std::min(task->total_records_sorted - records_emitted, WRITE_IO_BYTES / sizeof(RecordType)); + RecordType *out_buf = reinterpret_cast(write_bufs[slot]); + uint64_t offset = 0; + + while (records_emitted < records_to_write) { + out_buf->key = __builtin_bswap64(sorted_keys->key); + uint32_t stream_id = sorted_keys->value; + void *value = readers[stream_id]->get_next_value(); + if (value == nullptr) [[unlikely]] { + while (readers[stream_id]->waiting_for_io()) { + poll_completions(); + } + } + std::memcpy(&out_buf->value, value, RecordType::VALUE_LENGTH); + out_buf++; + sorted_keys++; + records_emitted++; + } + ring->prepare_write(out_fd, write_bufs[slot], records_to_write * sizeof(RecordType), + out_file_offset, (uint64_t)slot << 32); + ring->submit_and_wait(0); + out_file_offset += records_to_write * sizeof(RecordType); + } + } +}; diff --git a/src/config.h b/src/config.h index 57f77ea..a8770d5 100644 --- a/src/config.h +++ b/src/config.h @@ -25,6 +25,9 @@ struct Config { bool use_std_sort; + /** If true, use io_uring for phase 1+2 (read + extract keys). Requires liburing. */ + bool use_async{false}; + inline uint64_t num_runs() { return (file_size_bytes + run_size_bytes - 1) / run_size_bytes; } diff --git a/src/io_uring_utils.h b/src/io_uring_utils.h new file mode 100644 index 0000000..b9c412f --- /dev/null +++ b/src/io_uring_utils.h @@ -0,0 +1,128 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace io_uring_utils { + +constexpr uint32_t BLOCK_ALIGN = 4096; + +constexpr bool POLL = true; + +struct ReadTask { + void *buf; + uint64_t bytes; + int fd; + uint64_t offset; + uint64_t user_data; +}; + +/** + * Per-thread io_uring context + */ +struct UringRing { + struct io_uring ring_; + uint32_t queue_depth_; + uint32_t pending_; // number of submitted but not yet reaped + + explicit UringRing(uint32_t queue_depth) + : queue_depth_(queue_depth), pending_(0) { + unsigned int flags = 0; + // flags |= IORING_SETUP_DEFER_TASKRUN | IORING_SETUP_SINGLE_ISSUER; + if (POLL) { + flags |= IORING_SETUP_IOPOLL; + } + + int ret = io_uring_queue_init(queue_depth, &ring_, flags); + assert(ret >= 0 && "io_uring_queue_init failed"); + } + + ~UringRing() { + io_uring_queue_exit(&ring_); + } + + UringRing(const UringRing&) = delete; + UringRing& operator=(const UringRing&) = delete; + + struct io_uring* get() { return &ring_; } + + bool prepare_read(int fd, void* buf, uint32_t len, uint64_t file_offset, uint64_t user_data) { + struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_); + if (!sqe) return false; + io_uring_prep_read(sqe, fd, buf, len, file_offset); + io_uring_sqe_set_data(sqe, reinterpret_cast(static_cast(user_data))); + io_uring_sqe_set_flags(sqe, 0); + pending_++; + return true; + } + + bool prepare_read(ReadTask &task) { + struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_); + if (!sqe) return false; + io_uring_prep_read(sqe, task.fd, task.buf, task.bytes, task.offset); + io_uring_sqe_set_data(sqe, reinterpret_cast(static_cast(task.user_data))); + io_uring_sqe_set_flags(sqe, 0); + pending_++; + return true; + } + + bool prepare_write(int fd, void *buf, uint32_t len, uint64_t file_offset, uint64_t user_data) { + struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_); + if (!sqe) return false; + io_uring_prep_write(sqe, fd, buf, len, file_offset); + io_uring_sqe_set_data(sqe, reinterpret_cast(static_cast(user_data))); + io_uring_sqe_set_flags(sqe, 0); + pending_++; + return true; + } + + /** Submit previously prepared SQEs to the kernel. */ + int submit_and_wait(int wait_nr) { + int n = io_uring_submit_and_wait(&ring_, wait_nr); + return n; + } + + bool wait_cqe(struct io_uring_cqe** cqe_ptr) { + int ret = io_uring_wait_cqe(&ring_, cqe_ptr); + if (ret < 0) return false; + return true; + } + + /** + * Peek at a completion without removing it. Returns true if one is available. + */ + bool peek_cqe(struct io_uring_cqe** cqe_ptr) { + return io_uring_peek_cqe(&ring_, cqe_ptr) == 0; + } + + void mark_cqe_seen(struct io_uring_cqe* cqe) { + io_uring_cqe_seen(&ring_, cqe); + if (pending_ > 0) pending_--; + } + + static uint64_t cqe_user_data(struct io_uring_cqe* cqe) { + return static_cast(reinterpret_cast(io_uring_cqe_get_data(cqe))); + } + + static int cqe_result(struct io_uring_cqe* cqe) { + return cqe->res; + } +}; + +/** + * Allocate a buffer suitable for O_DIRECT and io_uring reads. + * Caller must free with aligned_free. + */ +inline void* aligned_alloc_read_buffer(uint64_t size) { + assert(size > 0 && (size % BLOCK_ALIGN) == 0); + void* p = nullptr; + int ret = posix_memalign(&p, BLOCK_ALIGN, size); + assert(ret == 0); + return p; +} + +} // namespace io_uring_utils diff --git a/src/main.cpp b/src/main.cpp index c037e8d..b13c76c 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -21,6 +21,7 @@ struct ParsedArgs { uint32_t num_threads; bool separate_values; bool use_std_sort; + bool use_async; }; size_t parseSizeString(const std::string& sizeStr) { @@ -105,6 +106,7 @@ void printHelp(const char* program_name) { std::cout << " --memory-size Memory size with unit (default: 100M)\n"; std::cout << " --read-chunk-size Read chunk size with unit (default: 100M)\n"; std::cout << " --num-threads Number of threads for parallel sorting (default: 1)\n"; + std::cout << " --use-async Use io_uring\n"; std::cout << " --help, -h Show this help message\n"; std::cout << "\nSize units: B (bytes), K (KB), M (MB), G (GB)\n"; std::cout << "Examples: 1M, 512K, 2G, 1024B\n"; @@ -119,6 +121,7 @@ int parseArguments(int argc, char* argv[], ParsedArgs& args) { args.memory_size = parseSizeString("100M"); // 100MB default args.num_threads = 1; // 1 thread default args.separate_values = false; + args.use_async = false; // Parse command line arguments for (int i = 1; i < argc; i++) { @@ -153,6 +156,9 @@ int parseArguments(int argc, char* argv[], ParsedArgs& args) { else if (arg == "--use-std-sort") { args.use_std_sort = true; } + else if (arg == "--use-async") { + args.use_async = true; + } else { std::cerr << "Unknown argument: " << arg << std::endl; std::cerr << "Use --help for usage information." << std::endl; @@ -200,6 +206,7 @@ int main(int argc, char* argv[]) { config.intermediate_file_prefix = args.working_dir + "/intermediate-" + file_size_str; config.separate_values = args.separate_values; config.use_std_sort = args.use_std_sort; + config.use_async = args.use_async; if (args.key_size == 8 && args.value_size == 8) { Sorter> sorter(std::move(config)); diff --git a/src/sorted_run.h b/src/sorted_run.h index e83ec75..1af016b 100644 --- a/src/sorted_run.h +++ b/src/sorted_run.h @@ -16,11 +16,6 @@ struct SortedRun { uint64_t num_elements; }; -enum ReaderState { - WaitingForIO, - Ready, -}; - template class SortedRunReader { static constexpr uint32_t ELEM_SIZE = sizeof(RecordType); @@ -49,7 +44,6 @@ class SortedRunReader { // uint32_t next_chunk; - // ReaderState state; public: // IoTask* get_next_io_task() { diff --git a/src/sorter.h b/src/sorter.h index 2222c13..2e9e426 100644 --- a/src/sorter.h +++ b/src/sorter.h @@ -22,12 +22,15 @@ #include +#include "io_uring_utils.h" #include "perf_utils.h" #include "key_value_pair.h" #include "sorted_run.h" #include "config.h" #include "merge.h" #include "read_values.h" +#include "async_stages.h" + #define _REENTRANT #include "ips4o.hpp" @@ -54,7 +57,7 @@ struct TimingInfo { }; -void write_to_disk(void *buf, uint64_t file_offset, uint64_t bytes, int fd) { +inline void write_to_disk(void *buf, uint64_t file_offset, uint64_t bytes, int fd) { uint64_t offset = 0ll; while (bytes > 0) { uint64_t bytes_to_write = std::min(bytes, MAX_IO_CHUNK_SIZE); @@ -111,9 +114,16 @@ class Sorter { void generate_run_for_merge_sort(std::vector &run, void *sorted_values, std::vector &key_index_pairs, int run_id); + /** Same as generate_run_for_merge_sort but assumes key_index_pairs already filled (e.g. by Task1). */ + void generate_run_for_merge_sort_after_keys(std::vector &run, + void *sorted_values, std::vector &key_index_pairs, int run_id); + void write_back_values_post_merge(std::vector &fds, std::vector> &tasks, std::vector> &key_index_pairs); + void write_back_values_post_merge_async(std::vector &fds, std::vector> &tasks, + std::vector> &key_index_pairs); + public: Sorter(Config &&config) { this->config = std::move(config); @@ -359,6 +369,27 @@ void Sorter::generate_run_for_merge_sort( generate_key_index_pairs(run, key_index_pairs, nullptr, false); in_place_sort(key_index_pairs); + auto start = std::chrono::high_resolution_clock::now(); + #pragma omp parallel for num_threads(config.num_threads) + for (int i=0; i(end - start).count() / 1000.0f; +} + +template +void Sorter::generate_run_for_merge_sort_after_keys( + std::vector &run, + void *sorted_values, std::vector &key_index_pairs, int run_id) { + assert(config.separate_values); + assert((run.size() * sizeof(RecordType)) % 64 == 0); + assert(key_index_pairs.size() == run.size()); + + in_place_sort(key_index_pairs); + auto start = std::chrono::high_resolution_clock::now(); #pragma omp parallel for num_threads(config.num_threads) for (int i=0; i::sort() { std::vector> key_index_pairs(num_runs); std::vector fds; + std::vector v; + v.resize(config.run_size_bytes / ELEM_SIZE); + memset(v.data(), 0, config.run_size_bytes); + void *sorted_values; + int ret = posix_memalign(&sorted_values, 4096, RecordType::VALUE_LENGTH * v.size()); + assert(ret == 0); + memset(sorted_values, 0, RecordType::VALUE_LENGTH * v.size()); + for (int i=0; i( + read_fd, j, config.run_size_bytes / config.num_threads, + reinterpret_cast(v.data()), key_index_pairs[i].data()); + } + in_place_sort(key_index_pairs[i]); + + std::string file_name = config.intermediate_file_prefix + "-chunk-" + std::to_string(i); + int write_fd = open(file_name.c_str(), O_CREAT | O_RDWR | O_TRUNC | O_DIRECT, 0644); + assert(write_fd != -1); + + posix_fallocate64(write_fd, 0, RecordType::VALUE_LENGTH * v.size()); + + uint64_t values_per_chunk = v.size() / config.num_threads; + assert(values_per_chunk % io_uring_utils::BLOCK_ALIGN == 0); + std::unique_ptr> writers[config.num_threads]; + + for (int j = 0; j < config.num_threads; j++) { + KeyIndexPair* key_index_buf = key_index_pairs[i].data() + j * values_per_chunk; + writers[i] = std::make_unique>( + write_fd, j, values_per_chunk, reinterpret_cast(v.data()), i, key_index_buf + ); + } + + #pragma omp parallel for num_threads(config.num_threads) + for (int j=0; jrun(); + } + } } std::vector> tasks; merge(key_index_pairs, config.run_size_bytes / sizeof(RecordType), &tasks); - write_back_values_post_merge(fds, tasks, key_index_pairs); + if (config.use_async) { + write_back_values_post_merge_async(fds, tasks, key_index_pairs); + } else { + write_back_values_post_merge(fds, tasks, key_index_pairs); + } } template @@ -563,3 +634,31 @@ void Sorter::write_back_values_post_merge(std::vector &fds, spdlog::debug("Done writing back values post merge"); } + +template +void Sorter::write_back_values_post_merge_async(std::vector &fds, + std::vector> &tasks, + std::vector> &key_index_pairs) { + std::vector start_ptrs; + for (auto &v: key_index_pairs) { + start_ptrs.push_back(v.data()); + } + + std::unique_ptr> writers[config.num_threads]; + for (int i=0; i writer {&tasks[i], out_fd, fds, start_ptrs}; + // writers.emplace_back(std::move(writer)); + writers[i] = std::make_unique>(&tasks[i], out_fd, fds, start_ptrs); + close(out_fd); + } + + // #pragma omp parallel for num_threads(config.num_threads) + // for (int i=0; irun(); + // } +} \ No newline at end of file From f55151c81d99324b9b6d076233fde31978e6516c Mon Sep 17 00:00:00 2001 From: Proteet Paul Date: Thu, 5 Feb 2026 06:48:43 -0600 Subject: [PATCH 02/11] Changes suggested by AI review --- src/async_stages.h | 86 ++++++++++++++++++++++++---------------------- src/sorter.h | 22 ++++++------ 2 files changed, 56 insertions(+), 52 deletions(-) diff --git a/src/async_stages.h b/src/async_stages.h index 4d7e3b3..0d3e490 100644 --- a/src/async_stages.h +++ b/src/async_stages.h @@ -49,13 +49,17 @@ void extract_keys_from_chunk( KeyValuePair* key_index_pairs ) { constexpr uint32_t ELEM_SIZE = sizeof(RecordType); - uint64_t offset = chunk_id * num_records_in_chunk; static_assert(RecordType::KEY_LENGTH == 8, "Size of key should be 8 bytes"); + + const uint64_t num_records_per_full_chunk = DEFAULT_READ_CHUNK_BYTES / ELEM_SIZE; + uint64_t idx = chunk_id * num_records_per_full_chunk; + auto ptr = (RecordType*) input_buf + chunk_id * num_records_per_full_chunk; + for (uint64_t i = 0; i < num_records_in_chunk; ++i) { - uint64_t idx = offset + i; - const uint8_t* ptr = (uint8_t*)input_buf + chunk_id * num_records_in_chunk * ELEM_SIZE; - key_index_pairs[idx].key = __builtin_bswap64(*((uint64_t*)ptr)); + key_index_pairs[idx].key = __builtin_bswap64(ptr->key); key_index_pairs[idx].set_value(&idx); + ptr++; + idx++; } } @@ -94,33 +98,26 @@ void read_run_and_extract_keys(int fd, uint64_t run_id, uint64_t run_size_bytes, io_uring_utils::UringRing ring(PREFETCH_DEPTH); - std::queue free_slots; - for (uint32_t i = 0; i < PREFETCH_DEPTH; ++i) { - free_slots.push(i); - } - auto start = std::chrono::high_resolution_clock::now(); uint64_t chunk_id = 0; uint64_t completed = 0; + uint32_t free_slots = PREFETCH_DEPTH; while (completed < num_chunks) { - while (chunk_id < num_chunks && !free_slots.empty()) { - uint32_t slot_id = free_slots.front(); - free_slots.pop(); - - uint64_t chunk_offset = chunk_id * read_chunk_bytes; + while (chunk_id < num_chunks && free_slots > 0) { + uint64_t chunk_offset = chunk_id * DEFAULT_READ_CHUNK_BYTES; void *buf = run_buffer + chunk_offset; - uint64_t chunk_bytes = std::min(read_chunk_bytes, run_size_bytes - chunk_offset); + uint64_t chunk_bytes = std::min(DEFAULT_READ_CHUNK_BYTES, run_size_bytes - chunk_offset); uint64_t read_size = align_up_block(chunk_bytes); bool ok = ring.prepare_read(fd, buf, static_cast(read_size), file_offset + chunk_offset, - (chunk_id << 32) | slot_id); + chunk_id << 32); if (!ok) { spdlog::error("submit_read failed for chunk {}", chunk_id); - free_slots.push(slot_id); break; } + free_slots--; chunk_id++; } ring.submit_and_wait(1); @@ -133,24 +130,21 @@ void read_run_and_extract_keys(int fd, uint64_t run_id, uint64_t run_size_bytes, auto p = process_cqe(cqe); uint32_t chunk_id = p.first >> 32; - uint32_t slot_id = p.first & 0xffff; if (p.second < 0) { spdlog::error("read chunk {} failed: {}", chunk_id, p.second); - free_slots.push(slot_id); ring.mark_cqe_seen(cqe); completed++; continue; } + free_slots++; - uint64_t done_chunk_offset = chunk_id * read_chunk_bytes; - uint64_t done_chunk_bytes = std::min(read_chunk_bytes, run_size_bytes - done_chunk_offset); + uint64_t done_chunk_offset = chunk_id * DEFAULT_READ_CHUNK_BYTES; + uint64_t done_chunk_bytes = std::min(DEFAULT_READ_CHUNK_BYTES, run_size_bytes - done_chunk_offset); uint64_t num_records_chunk = done_chunk_bytes / ELEM_SIZE; extract_keys_from_chunk( run_buffer, chunk_id, num_records_chunk, key_index_pairs ); - - free_slots.push(slot_id); ring.mark_cqe_seen(cqe); completed++; } @@ -196,11 +190,11 @@ class ValueWriterPostSort { KeyIndexPair* key_index_pairs ): thread_idx(thread_idx), values_per_chunk(values_per_chunk), run_idx(run_idx), input_buffer(input_buffer), key_index_pairs(key_index_pairs) { - fd = dup(fd); + this->fd = dup(fd); ring = std::make_unique(PREFETCH_DEPTH); write_bufs.resize(NUM_SLOTS); for (uint32_t i=0; ipeek_cqe(&cqe)) { + auto p = process_cqe(cqe); + uint32_t slot_id = p.first >> 32; + uint32_t write_id = p.first & 0xFFFFFFFF; + if (p.second < 0) { + spdlog::error("write chunk {} failed: {}", write_id, p.second); + } + slots.push(slot_id); + completed++; + ring->mark_cqe_seen(cqe); + } if (!ring->wait_cqe(&cqe)) { spdlog::error("wait_cqe failed"); break; } auto p = process_cqe(cqe); - uint32_t slot_id = (uint32_t) (p.first & 0xffff); + uint32_t slot_id = p.first >> 32; slots.push(slot_id); - int result = p.second; + completed++; + ring->mark_cqe_seen(cqe); } } } @@ -345,7 +350,7 @@ class AsyncValueReader { if (states[next_buf_idx] == IoCompleted) { return std::nullopt; } - uint64_t user_data = (cur_buf_idx << 16) | next_buf_idx; + uint64_t user_data = (reader_id << 16) | next_buf_idx; io_uring_utils::ReadTask task { ptr[next_buf_idx], read_chunk_size, fd, file_offset, user_data @@ -376,7 +381,7 @@ class ValueWriterPostMerge { std::vector> readers; public: - static constexpr uint32_t NUM_SLOTS = PREFETCH_DEPTH * 4; + static constexpr uint64_t NUM_SLOTS = PREFETCH_DEPTH * 4; static constexpr uint32_t BATCH_SIZE = 1; @@ -414,7 +419,7 @@ class ValueWriterPostMerge { void poll_completions() { struct io_uring_cqe *cqe; ring->wait_cqe(&cqe); - ring->peek_cqe(&cqe); + if ((cqe->user_data >> 32) < NUM_SLOTS) { uint32_t slot = (cqe->user_data >> 32); slots.push(slot); @@ -423,7 +428,7 @@ class ValueWriterPostMerge { readers[reader_idx]->process_io_completion(cqe->user_data); auto task = readers[reader_idx]->get_next_io(); if (task.has_value()) { - task.value().user_data |= reader_idx; + task.value().user_data |= (NUM_SLOTS << 32); ring->prepare_read(task.value()); } } @@ -441,7 +446,7 @@ class ValueWriterPostMerge { for (auto &reader: readers) { auto task = reader->get_next_io(); if (task.has_value()) { - task.value().user_data |= i; + task.value().user_data |= (NUM_SLOTS << 32); ring->prepare_read(task.value()); } i++; @@ -455,12 +460,12 @@ class ValueWriterPostMerge { } uint32_t slot = slots.front(); slots.pop(); - uint64_t records_to_write = records_emitted + std::min(task->total_records_sorted - records_emitted, WRITE_IO_BYTES / sizeof(RecordType)); + uint64_t num_records_in_batch = records_emitted + std::min(task->total_records_sorted - records_emitted, WRITE_IO_BYTES / sizeof(RecordType)); RecordType *out_buf = reinterpret_cast(write_bufs[slot]); uint64_t offset = 0; - while (records_emitted < records_to_write) { - out_buf->key = __builtin_bswap64(sorted_keys->key); + for (uint64_t i=0; ikey); uint32_t stream_id = sorted_keys->value; void *value = readers[stream_id]->get_next_value(); if (value == nullptr) [[unlikely]] { @@ -468,15 +473,14 @@ class ValueWriterPostMerge { poll_completions(); } } - std::memcpy(&out_buf->value, value, RecordType::VALUE_LENGTH); - out_buf++; + std::memcpy(&out_buf[i].value, value, RecordType::VALUE_LENGTH); sorted_keys++; - records_emitted++; } - ring->prepare_write(out_fd, write_bufs[slot], records_to_write * sizeof(RecordType), + ring->prepare_write(out_fd, write_bufs[slot], num_records_in_batch * sizeof(RecordType), out_file_offset, (uint64_t)slot << 32); ring->submit_and_wait(0); - out_file_offset += records_to_write * sizeof(RecordType); + out_file_offset += num_records_in_batch * sizeof(RecordType); + records_emitted += num_records_in_batch; } } }; diff --git a/src/sorter.h b/src/sorter.h index 2e9e426..4affd0f 100644 --- a/src/sorter.h +++ b/src/sorter.h @@ -443,7 +443,6 @@ void Sorter::sort() { fds.push_back( write_intermediate_values(sorted_values, v.size() * RecordType::VALUE_LENGTH, i) ); - free(sorted_values); } else { #pragma omp parallel for num_threads(config.num_threads) for (int j=0; j::sort() { uint64_t values_per_chunk = v.size() / config.num_threads; assert(values_per_chunk % io_uring_utils::BLOCK_ALIGN == 0); - std::unique_ptr> writers[config.num_threads]; + std::vector>> writers; for (int j = 0; j < config.num_threads; j++) { KeyIndexPair* key_index_buf = key_index_pairs[i].data() + j * values_per_chunk; - writers[i] = std::make_unique>( + auto writer = std::make_unique>( write_fd, j, values_per_chunk, reinterpret_cast(v.data()), i, key_index_buf ); + writers.push_back(std::move(writer)); } #pragma omp parallel for num_threads(config.num_threads) @@ -476,6 +476,7 @@ void Sorter::sort() { } } } + free(sorted_values); std::vector> tasks; merge(key_index_pairs, config.run_size_bytes / sizeof(RecordType), &tasks); if (config.use_async) { @@ -644,21 +645,20 @@ void Sorter::write_back_values_post_merge_async(std::vector &fd start_ptrs.push_back(v.data()); } - std::unique_ptr> writers[config.num_threads]; + std::vector>> writers; for (int i=0; i writer {&tasks[i], out_fd, fds, start_ptrs}; - // writers.emplace_back(std::move(writer)); - writers[i] = std::make_unique>(&tasks[i], out_fd, fds, start_ptrs); + auto writer = std::make_unique>(&tasks[i], out_fd, fds, start_ptrs); + writers.push_back(std::move(writer)); close(out_fd); } - // #pragma omp parallel for num_threads(config.num_threads) - // for (int i=0; irun(); - // } + #pragma omp parallel for num_threads(config.num_threads) + for (int i=0; irun(); + } } \ No newline at end of file From 1df5b91face23af7f0f292799c050ae6c5fe98f5 Mon Sep 17 00:00:00 2001 From: proteet Date: Thu, 5 Feb 2026 12:51:55 -0700 Subject: [PATCH 03/11] Some fixes --- run.sh | 9 ++++ src/async_stages.h | 132 +++++++++++++++++++++++++++++---------------- src/main.cpp | 2 +- src/merge.h | 6 +-- src/sorter.h | 14 ++++- 5 files changed, 112 insertions(+), 51 deletions(-) diff --git a/run.sh b/run.sh index 4438543..295484f 100755 --- a/run.sh +++ b/run.sh @@ -13,6 +13,7 @@ PROFILE_OUTPUT="perf.data" FLAMEGRAPH_OUTPUT="flamegraph.svg" SEPARATE_VALUES=false ENABLE_MEMORY_PROFILING=false +USE_ASYNC_IO=false PCM_MEMORY="/users/proteet/pcm/build/bin/pcm-memory" # Function to show usage @@ -88,6 +89,10 @@ while [[ $# -gt 0 ]]; do FLAMEGRAPH_OUTPUT="$2" shift 2 ;; + --use-async) + USE_ASYNC_IO=true + shift + ;; --help|-h) show_usage exit 0 @@ -151,6 +156,10 @@ if [ "$SEPARATE_VALUES" = true ]; then CMD_ARGS="$CMD_ARGS --separate-values" fi +if [ "$USE_ASYNC_IO" = true ]; then + CMD_ARGS="$CMD_ARGS --use-async" +fi + echo "Running sorter with parameters:" echo " File size: $FILE_SIZE" echo " Key size: $KEY_SIZE" diff --git a/src/async_stages.h b/src/async_stages.h index 0d3e490..38c3a38 100644 --- a/src/async_stages.h +++ b/src/async_stages.h @@ -43,7 +43,7 @@ inline uint64_t align_up_block(uint64_t size) { */ template void extract_keys_from_chunk( - const void* input_buf, + const RecordType* input_buf, uint64_t chunk_id, uint64_t num_records_in_chunk, KeyValuePair* key_index_pairs @@ -79,8 +79,8 @@ inline std::pair process_cqe(io_uring_cqe *cqe) { * read_chunk_bytes: size of each read chunk (will be aligned to BLOCK_ALIGN). */ template -void read_run_and_extract_keys(int fd, uint64_t run_id, uint64_t run_size_bytes, - uint8_t* run_buffer, +void read_run_and_extract_keys(int fd, uint64_t thread_id, uint64_t run_size_bytes, + RecordType* records, KeyValuePair* key_index_pairs ) { using KeyIndexPair = KeyValuePair; @@ -90,10 +90,9 @@ void read_run_and_extract_keys(int fd, uint64_t run_id, uint64_t run_size_bytes, assert(run_size_bytes % ELEM_SIZE == 0); uint64_t num_records = run_size_bytes / ELEM_SIZE; - uint64_t read_chunk_bytes = DEFAULT_READ_CHUNK_BYTES; - uint64_t file_offset = run_id * run_size_bytes; + uint64_t file_offset = thread_id * run_size_bytes; uint64_t num_chunks = (run_size_bytes + read_chunk_bytes - 1) / read_chunk_bytes; io_uring_utils::UringRing ring(PREFETCH_DEPTH); @@ -106,11 +105,11 @@ void read_run_and_extract_keys(int fd, uint64_t run_id, uint64_t run_size_bytes, while (completed < num_chunks) { while (chunk_id < num_chunks && free_slots > 0) { uint64_t chunk_offset = chunk_id * DEFAULT_READ_CHUNK_BYTES; - void *buf = run_buffer + chunk_offset; + auto *buf = records + chunk_offset / ELEM_SIZE; uint64_t chunk_bytes = std::min(DEFAULT_READ_CHUNK_BYTES, run_size_bytes - chunk_offset); uint64_t read_size = align_up_block(chunk_bytes); - bool ok = ring.prepare_read(fd, buf, static_cast(read_size), + bool ok = ring.prepare_read(fd, (void*)buf, static_cast(read_size), file_offset + chunk_offset, chunk_id << 32); if (!ok) { @@ -143,7 +142,7 @@ void read_run_and_extract_keys(int fd, uint64_t run_id, uint64_t run_size_bytes, uint64_t num_records_chunk = done_chunk_bytes / ELEM_SIZE; extract_keys_from_chunk( - run_buffer, chunk_id, num_records_chunk, key_index_pairs + records, chunk_id, num_records_chunk, key_index_pairs ); ring.mark_cqe_seen(cqe); completed++; @@ -152,7 +151,7 @@ void read_run_and_extract_keys(int fd, uint64_t run_id, uint64_t run_size_bytes, auto end = std::chrono::high_resolution_clock::now(); auto time_elapsed = std::chrono::duration_cast(end - start).count() / 1000.0f; - spdlog::debug("Run {}: read+extract {} ms, {} chunks", run_id, time_elapsed, num_chunks); + spdlog::debug("Run {}: read+extract {} ms, {} chunks", thread_id, time_elapsed, num_chunks); } /** @@ -194,7 +193,8 @@ class ValueWriterPostSort { ring = std::make_unique(PREFETCH_DEPTH); write_bufs.resize(NUM_SLOTS); for (uint32_t i=0; i slots; for (uint32_t i=0; iprepare_write(fd, buf, WRITE_IO_BYTES, file_offset, user_data); to_submit++; next_write++; } - if (slots.empty() || to_submit >= BATCH_SIZE) { - // Wait for completions only if no more cpu work can be done - ring->submit_and_wait(slots.empty()); + bool should_submit = slots.empty() || to_submit >= BATCH_SIZE || (next_write == num_writes); + if (should_submit) { + int nr = (next_write == num_writes) ? (num_writes - completed) : slots.empty(); + ring->submit_and_wait(nr); to_submit = 0; - if (!slots.empty()) { - continue; - } - struct io_uring_cqe* cqe = nullptr; while (ring->peek_cqe(&cqe)) { auto p = process_cqe(cqe); - uint32_t slot_id = p.first >> 32; - uint32_t write_id = p.first & 0xFFFFFFFF; + uint32_t slot_id = p.first; if (p.second < 0) { - spdlog::error("write chunk {} failed: {}", write_id, p.second); + spdlog::error("write chunk failed: {}", p.second); } slots.push(slot_id); completed++; ring->mark_cqe_seen(cqe); } - if (!ring->wait_cqe(&cqe)) { - spdlog::error("wait_cqe failed"); - break; - } - auto p = process_cqe(cqe); - uint32_t slot_id = p.first >> 32; - slots.push(slot_id); - completed++; - ring->mark_cqe_seen(cqe); } } + spdlog::debug("Done writing out values post sort"); } }; @@ -304,13 +293,16 @@ class AsyncValueReader { public: inline bool waiting_for_io() { - return states[cur_buf_idx] == BufState::Empty; + spdlog::debug("cur_buf_idx of stream {}: {}", reader_id, cur_buf_idx); + return states[cur_buf_idx] != BufState::IoCompleted; } AsyncValueReader(int fd, uint64_t start_offset, uint64_t value_length, uint64_t read_chunk_size, int reader_id): - fd(fd), file_offset(start_offset), chunk_offset(0ll), value_length_bytes(value_length), reader_id(reader_id) { + fd(fd), file_offset(start_offset), chunk_offset(0ll), value_length_bytes(value_length), reader_id(reader_id), + read_chunk_size(read_chunk_size) { for (int i=0; i<2; i++) { - posix_memalign(&ptr[i], 4096, read_chunk_size); + int ret = posix_memalign(&ptr[i], 4096, read_chunk_size); + assert(ret == 0); states[i] = BufState::Empty; } cur_buf_idx = 0; @@ -325,14 +317,32 @@ class AsyncValueReader { free(ptr[1]); } + inline void *get_next_value_fast() { + if (states[cur_buf_idx] != BufState::IoCompleted || chunk_offset >= read_chunk_size) { + return nullptr; + } + void *res = (uint8_t*) ptr[cur_buf_idx] + chunk_offset; + chunk_offset += value_length_bytes; + return res; + } + + inline bool need_submit() { + return states[cur_buf_idx] == BufState::IoCompleted || states[cur_buf_idx ^ 1] == BufState::Empty; + } + inline void *get_next_value() { + if (states[cur_buf_idx] != BufState::IoCompleted) { + return nullptr; + } if (chunk_offset >= read_chunk_size) { + spdlog::debug("Here: {}", this->reader_id); states[cur_buf_idx] = BufState::Empty; + cur_buf_idx = 1 ^ cur_buf_idx; + chunk_offset = 0ll; if (states[cur_buf_idx] != BufState::IoCompleted) { return nullptr; } - chunk_offset = 0ll; } void *res = (uint8_t*) ptr[cur_buf_idx] + chunk_offset; chunk_offset += value_length_bytes; @@ -341,20 +351,23 @@ class AsyncValueReader { inline void process_io_completion(uint64_t user_data) { int buf_idx = (int)(user_data & 1); + spdlog::debug("buf_idx of completed IO: {}", buf_idx); states[buf_idx] = IoCompleted; - file_offset += read_chunk_size; } inline std::optional get_next_io() { - uint64_t next_buf_idx = cur_buf_idx ^ 1; + uint64_t next_buf_idx = (states[cur_buf_idx] == Empty) ? cur_buf_idx: cur_buf_idx ^ 1; + // uint64_t next_buf_idx = cur_buf_idx ^ 1; if (states[next_buf_idx] == IoCompleted) { return std::nullopt; } + states[next_buf_idx] = WaitingForIO; uint64_t user_data = (reader_id << 16) | next_buf_idx; io_uring_utils::ReadTask task { ptr[next_buf_idx], read_chunk_size, fd, file_offset, user_data }; + file_offset += read_chunk_size; return task; } }; @@ -389,6 +402,7 @@ class ValueWriterPostMerge { std::vector &in_fds, std::vector &start_ptrs) : task(task) { assert(in_fds.size() == start_ptrs.size()); + assert(in_fds.size() > 0 && "in_fds must not be empty"); assert(WRITE_IO_BYTES % RecordType::VALUE_LENGTH == 0); ring = std::make_unique(PREFETCH_DEPTH); this->out_fd = dup(out_fd); @@ -402,7 +416,8 @@ class ValueWriterPostMerge { write_bufs.resize(NUM_SLOTS); for (uint32_t i=0; iwait_cqe(&cqe); + if (!ring->peek_cqe(&cqe)) { + ring->wait_cqe(&cqe); + } if ((cqe->user_data >> 32) < NUM_SLOTS) { + spdlog::debug("Processing write completion"); uint32_t slot = (cqe->user_data >> 32); slots.push(slot); } else { - int reader_idx = cqe->user_data & 0xffff; + int reader_idx = (cqe->user_data >> 16) & 0xffff; + spdlog::debug("Reader idx: {}", reader_idx); readers[reader_idx]->process_io_completion(cqe->user_data); auto task = readers[reader_idx]->get_next_io(); if (task.has_value()) { @@ -436,20 +456,19 @@ class ValueWriterPostMerge { } void run() { + spdlog::debug("Start post-merge ops"); for (uint32_t i=0; ioutput; - int i = 0; for (auto &reader: readers) { auto task = reader->get_next_io(); if (task.has_value()) { task.value().user_data |= (NUM_SLOTS << 32); ring->prepare_read(task.value()); } - i++; } ring->submit_and_wait(0); uint64_t out_file_offset = 0ll; @@ -460,27 +479,50 @@ class ValueWriterPostMerge { } uint32_t slot = slots.front(); slots.pop(); - uint64_t num_records_in_batch = records_emitted + std::min(task->total_records_sorted - records_emitted, WRITE_IO_BYTES / sizeof(RecordType)); + uint64_t num_records_in_batch = std::min(task->total_records_sorted - records_emitted, WRITE_IO_BYTES / sizeof(RecordType)); RecordType *out_buf = reinterpret_cast(write_bufs[slot]); uint64_t offset = 0; for (uint64_t i=0; ikey); uint32_t stream_id = sorted_keys->value; - void *value = readers[stream_id]->get_next_value(); + if (stream_id >= readers.size()) { + spdlog::error("Stream id out of bounds: {}, Records emitted: {}", stream_id, records_emitted); + assert(false); + } + + assert(readers[stream_id] != nullptr); + + void *value = readers[stream_id]->get_next_value_fast(); if (value == nullptr) [[unlikely]] { + // Having another call here just so that we can submit an io pre-emptively + value = readers[stream_id]->get_next_value(); + if (readers[stream_id]->need_submit()) { + auto task = readers[stream_id]->get_next_io(); + assert(task.has_value() && "Task is null"); + task.value().user_data |= (NUM_SLOTS << 32); + ring->prepare_read(task.value()); + } + + spdlog::debug("Encountered page fault for stream: {}, i: {}", stream_id, i); while (readers[stream_id]->waiting_for_io()) { poll_completions(); } + ring->submit_and_wait(0); + spdlog::debug("Processed page fault"); + value = readers[stream_id]->get_next_value(); } + assert(value != nullptr && "Value is null"); std::memcpy(&out_buf[i].value, value, RecordType::VALUE_LENGTH); sorted_keys++; } + spdlog::debug("Exhausted write buffer"); ring->prepare_write(out_fd, write_bufs[slot], num_records_in_batch * sizeof(RecordType), out_file_offset, (uint64_t)slot << 32); ring->submit_and_wait(0); out_file_offset += num_records_in_batch * sizeof(RecordType); records_emitted += num_records_in_batch; } + spdlog::debug("End post-merge ops"); } }; diff --git a/src/main.cpp b/src/main.cpp index b13c76c..8a922e6 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -184,7 +184,7 @@ void pin_current_thread() { } int main(int argc, char* argv[]) { - spdlog::set_level(spdlog::level::info); + spdlog::set_level(spdlog::level::debug); // pin_current_thread(); ParsedArgs args; int parse_result = parseArguments(argc, argv, args); diff --git a/src/merge.h b/src/merge.h index d342645..4f3cbf9 100644 --- a/src/merge.h +++ b/src/merge.h @@ -52,9 +52,9 @@ std::vector> create_tasks(std::vector void run_merge_avx_512(MergeTask *task, bool *result_sorted) { using Regtype = avx512; - using ItemType = RecordType; - static_assert(std::is_same_v); - static_assert(std::is_same_v); + using ItemType = KeyValue; + // static_assert(std::is_same_v); + // static_assert(std::is_same_v); int num_streams = task->start_ptrs.size(); std::vector end_ptrs; diff --git a/src/sorter.h b/src/sorter.h index 4affd0f..07c7c66 100644 --- a/src/sorter.h +++ b/src/sorter.h @@ -444,11 +444,18 @@ void Sorter::sort() { write_intermediate_values(sorted_values, v.size() * RecordType::VALUE_LENGTH, i) ); } else { + spdlog::debug("Starting async read and key extraction"); + key_index_pairs[i].resize(v.size()); + memset(key_index_pairs[i].data(), 0, v.size() * sizeof(KeyIndexPair)); + + uint64_t records_per_thread = (config.run_size_bytes / config.num_threads) / ELEM_SIZE; #pragma omp parallel for num_threads(config.num_threads) for (int j=0; j( read_fd, j, config.run_size_bytes / config.num_threads, - reinterpret_cast(v.data()), key_index_pairs[i].data()); + record_ptr, key_index_ptr); } in_place_sort(key_index_pairs[i]); @@ -474,12 +481,14 @@ void Sorter::sort() { for (int j=0; jrun(); } + fds.push_back(write_fd); + spdlog::debug("Done async read and key extraction"); } } free(sorted_values); std::vector> tasks; merge(key_index_pairs, config.run_size_bytes / sizeof(RecordType), &tasks); - if (config.use_async) { + if (false) { write_back_values_post_merge_async(fds, tasks, key_index_pairs); } else { write_back_values_post_merge(fds, tasks, key_index_pairs); @@ -607,6 +616,7 @@ void Sorter::write_back_values_post_merge(std::vector &fds, output_ptr->key = __builtin_bswap64(ptr->key); uint32_t stream_id = ptr->value; void *value = value_readers[stream_id].read_next(); + assert(stream_id < value_readers.size()); std::memcpy(&output_ptr->value, value, RecordType::VALUE_LENGTH); output_ptr++; ptr++; From 0541d19a66994bcff6cf5f1184d1d4e7fde7649b Mon Sep 17 00:00:00 2001 From: proteet Date: Thu, 5 Feb 2026 13:33:37 -0700 Subject: [PATCH 04/11] Async IO functional --- src/async_stages.h | 47 ++++++++++++++++++++++------------------------ src/sorter.h | 3 +-- 2 files changed, 23 insertions(+), 27 deletions(-) diff --git a/src/async_stages.h b/src/async_stages.h index 38c3a38..d10e1a2 100644 --- a/src/async_stages.h +++ b/src/async_stages.h @@ -210,13 +210,15 @@ class ValueWriterPostSort { } } - void write_values_to_buf(uint32_t buf_slot) { - uint64_t num_values = WRITE_IO_BYTES / RecordType::VALUE_LENGTH; + /** Fills write_bufs[buf_slot] with values for key_index_pairs[start_index .. start_index+num_values-1]. */ + void write_values_to_buf(uint32_t buf_slot, uint64_t start_index) { + const uint64_t num_values = WRITE_IO_BYTES / RecordType::VALUE_LENGTH; uint8_t *buf = (uint8_t*)write_bufs[buf_slot]; - for (uint64_t i=0; iprepare_write(fd, buf, WRITE_IO_BYTES, file_offset, user_data); + uint64_t start_index = next_write * num_values_per_batch; + write_values_to_buf(slot, start_index); + ring->prepare_write(fd, write_bufs[slot], WRITE_IO_BYTES, file_offset, slot); to_submit++; next_write++; + file_offset += WRITE_IO_BYTES; } bool should_submit = slots.empty() || to_submit >= BATCH_SIZE || (next_write == num_writes); if (should_submit) { - int nr = (next_write == num_writes) ? (num_writes - completed) : slots.empty(); + int nr = (next_write == num_writes) ? static_cast(num_writes - completed) : 1; ring->submit_and_wait(nr); to_submit = 0; @@ -293,7 +296,7 @@ class AsyncValueReader { public: inline bool waiting_for_io() { - spdlog::debug("cur_buf_idx of stream {}: {}", reader_id, cur_buf_idx); + // spdlog::debug("cur_buf_idx of stream {}: {}", reader_id, cur_buf_idx); return states[cur_buf_idx] != BufState::IoCompleted; } @@ -335,7 +338,6 @@ class AsyncValueReader { return nullptr; } if (chunk_offset >= read_chunk_size) { - spdlog::debug("Here: {}", this->reader_id); states[cur_buf_idx] = BufState::Empty; cur_buf_idx = 1 ^ cur_buf_idx; @@ -351,7 +353,7 @@ class AsyncValueReader { inline void process_io_completion(uint64_t user_data) { int buf_idx = (int)(user_data & 1); - spdlog::debug("buf_idx of completed IO: {}", buf_idx); + // spdlog::debug("buf_idx of completed IO: {}", buf_idx); states[buf_idx] = IoCompleted; } @@ -432,19 +434,19 @@ class ValueWriterPostMerge { ValueWriterPostMerge& operator=(const ValueWriterPostMerge&) = delete; inline void poll_completions() { - spdlog::debug("Polling completions"); + // spdlog::debug("Polling completions"); struct io_uring_cqe *cqe; if (!ring->peek_cqe(&cqe)) { ring->wait_cqe(&cqe); } if ((cqe->user_data >> 32) < NUM_SLOTS) { - spdlog::debug("Processing write completion"); + // spdlog::debug("Processing write completion"); uint32_t slot = (cqe->user_data >> 32); slots.push(slot); } else { int reader_idx = (cqe->user_data >> 16) & 0xffff; - spdlog::debug("Reader idx: {}", reader_idx); + // spdlog::debug("Reader idx: {}", reader_idx); readers[reader_idx]->process_io_completion(cqe->user_data); auto task = readers[reader_idx]->get_next_io(); if (task.has_value()) { @@ -486,16 +488,14 @@ class ValueWriterPostMerge { for (uint64_t i=0; ikey); uint32_t stream_id = sorted_keys->value; - if (stream_id >= readers.size()) { - spdlog::error("Stream id out of bounds: {}, Records emitted: {}", stream_id, records_emitted); - assert(false); - } - + + assert(stream_id < readers.size()); assert(readers[stream_id] != nullptr); void *value = readers[stream_id]->get_next_value_fast(); if (value == nullptr) [[unlikely]] { - // Having another call here just so that we can submit an io pre-emptively + // Having another call here just so that we can submit an io pre-emptively. This is necessary when switching from + // one buffer to the next. If the IO to the next buffer is complete, it won't refill the earlier buffer. value = readers[stream_id]->get_next_value(); if (readers[stream_id]->need_submit()) { auto task = readers[stream_id]->get_next_io(); @@ -504,19 +504,16 @@ class ValueWriterPostMerge { ring->prepare_read(task.value()); } - spdlog::debug("Encountered page fault for stream: {}, i: {}", stream_id, i); while (readers[stream_id]->waiting_for_io()) { poll_completions(); } ring->submit_and_wait(0); - spdlog::debug("Processed page fault"); value = readers[stream_id]->get_next_value(); } assert(value != nullptr && "Value is null"); std::memcpy(&out_buf[i].value, value, RecordType::VALUE_LENGTH); sorted_keys++; } - spdlog::debug("Exhausted write buffer"); ring->prepare_write(out_fd, write_bufs[slot], num_records_in_batch * sizeof(RecordType), out_file_offset, (uint64_t)slot << 32); ring->submit_and_wait(0); diff --git a/src/sorter.h b/src/sorter.h index 07c7c66..9437e3a 100644 --- a/src/sorter.h +++ b/src/sorter.h @@ -488,7 +488,7 @@ void Sorter::sort() { free(sorted_values); std::vector> tasks; merge(key_index_pairs, config.run_size_bytes / sizeof(RecordType), &tasks); - if (false) { + if (config.use_async) { write_back_values_post_merge_async(fds, tasks, key_index_pairs); } else { write_back_values_post_merge(fds, tasks, key_index_pairs); @@ -616,7 +616,6 @@ void Sorter::write_back_values_post_merge(std::vector &fds, output_ptr->key = __builtin_bswap64(ptr->key); uint32_t stream_id = ptr->value; void *value = value_readers[stream_id].read_next(); - assert(stream_id < value_readers.size()); std::memcpy(&output_ptr->value, value, RecordType::VALUE_LENGTH); output_ptr++; ptr++; From 82429b00da1067c5690a447d530b2326bf67766c Mon Sep 17 00:00:00 2001 From: proteet Date: Thu, 5 Feb 2026 14:07:12 -0700 Subject: [PATCH 05/11] Add timing information and minor changes --- src/async_stages.h | 10 +++++++--- src/io_uring_utils.h | 4 ++-- src/sorter.h | 19 ++++++++++++++++--- 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/src/async_stages.h b/src/async_stages.h index d10e1a2..8bddfc9 100644 --- a/src/async_stages.h +++ b/src/async_stages.h @@ -190,7 +190,7 @@ class ValueWriterPostSort { ): thread_idx(thread_idx), values_per_chunk(values_per_chunk), run_idx(run_idx), input_buffer(input_buffer), key_index_pairs(key_index_pairs) { this->fd = dup(fd); - ring = std::make_unique(PREFETCH_DEPTH); + // ring = std::make_unique(PREFETCH_DEPTH); write_bufs.resize(NUM_SLOTS); for (uint32_t i=0; i(PREFETCH_DEPTH); + std::queue slots; for (uint32_t i=0; i= BATCH_SIZE || (next_write == num_writes); if (should_submit) { - int nr = (next_write == num_writes) ? static_cast(num_writes - completed) : 1; + int nr = (next_write == num_writes) ? static_cast(num_writes - completed) : slots.empty(); ring->submit_and_wait(nr); to_submit = 0; @@ -406,7 +408,7 @@ class ValueWriterPostMerge { assert(in_fds.size() == start_ptrs.size()); assert(in_fds.size() > 0 && "in_fds must not be empty"); assert(WRITE_IO_BYTES % RecordType::VALUE_LENGTH == 0); - ring = std::make_unique(PREFETCH_DEPTH); + this->out_fd = dup(out_fd); for (int i=0; i(PREFETCH_DEPTH); + for (uint32_t i=0; i::sort() { memset(key_index_pairs[i].data(), 0, v.size() * sizeof(KeyIndexPair)); uint64_t records_per_thread = (config.run_size_bytes / config.num_threads) / ELEM_SIZE; + auto start_read_extract = std::chrono::high_resolution_clock::now(); #pragma omp parallel for num_threads(config.num_threads) for (int j=0; j::sort() { read_fd, j, config.run_size_bytes / config.num_threads, record_ptr, key_index_ptr); } + auto end_read_extract = std::chrono::high_resolution_clock::now(); + timing_info.async_read_and_extract_keys += std::chrono::duration_cast(end_read_extract - start_read_extract).count() / 1000.0f; in_place_sort(key_index_pairs[i]); std::string file_name = config.intermediate_file_prefix + "-chunk-" + std::to_string(i); @@ -477,10 +484,13 @@ void Sorter::sort() { writers.push_back(std::move(writer)); } + auto start_writers = std::chrono::high_resolution_clock::now(); #pragma omp parallel for num_threads(config.num_threads) for (int j=0; jrun(); } + auto end_writers = std::chrono::high_resolution_clock::now(); + timing_info.async_value_writer_post_sort += std::chrono::duration_cast(end_writers - start_writers).count() / 1000.0f; fds.push_back(write_fd); spdlog::debug("Done async read and key extraction"); } @@ -636,7 +646,7 @@ void Sorter::write_back_values_post_merge(std::vector &fds, close(fd); } auto write_end = std::chrono::high_resolution_clock::now(); - timing_info.final_output_write += std::chrono::duration_cast(write_end - write_start).count() / 1000.0f; + timing_info.output_write += std::chrono::duration_cast(write_end - write_start).count() / 1000.0f; for (size_t i = 0; i < output_ptrs.size(); i++) { free(output_ptrs[i]); @@ -666,8 +676,11 @@ void Sorter::write_back_values_post_merge_async(std::vector &fd close(out_fd); } + auto start = std::chrono::high_resolution_clock::now(); #pragma omp parallel for num_threads(config.num_threads) for (int i=0; irun(); } + auto end = std::chrono::high_resolution_clock::now(); + timing_info.value_write_back_post_merge = std::chrono::duration_cast(end - start).count() / 1000.0f; } \ No newline at end of file From 35b18aab4097470acfc65cd1ae618e5a1b191bec Mon Sep 17 00:00:00 2001 From: proteet Date: Mon, 9 Feb 2026 14:47:36 -0700 Subject: [PATCH 06/11] Use a larger ring for the last phase --- src/async_stages.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/async_stages.h b/src/async_stages.h index 8bddfc9..a23b18d 100644 --- a/src/async_stages.h +++ b/src/async_stages.h @@ -461,7 +461,8 @@ class ValueWriterPostMerge { void run() { spdlog::debug("Start post-merge ops"); - ring = std::make_unique(PREFETCH_DEPTH); + uint32_t ring_num_entries = NUM_SLOTS + readers.size(); + ring = std::make_unique(ring_num_entries); for (uint32_t i=0; i Date: Wed, 11 Feb 2026 11:27:52 -0600 Subject: [PATCH 07/11] Improve timing info display --- src/async_stages.h | 15 +++++++------- src/sorter.h | 49 +++++++++++++++------------------------------- 2 files changed, 24 insertions(+), 40 deletions(-) diff --git a/src/async_stages.h b/src/async_stages.h index a23b18d..50b00ae 100644 --- a/src/async_stages.h +++ b/src/async_stages.h @@ -177,24 +177,23 @@ class ValueWriterPostSort { KeyIndexPair* key_index_pairs; // Sorted key-index pairs -public: - static constexpr uint64_t WRITE_IO_BYTES = RecordType::VALUE_LENGTH * io_uring_utils::BLOCK_ALIGN; - - static constexpr uint32_t NUM_SLOTS = PREFETCH_DEPTH * 4; + static constexpr uint32_t NUM_SLOTS = 16; static constexpr uint32_t BATCH_SIZE = 1; - + + static constexpr uint64_t WRITE_IO_BYTES = RecordType::VALUE_LENGTH * io_uring_utils::BLOCK_ALIGN; +public: explicit ValueWriterPostSort(int fd, int thread_idx, uint64_t values_per_chunk, uint8_t* input_buffer, int run_idx, KeyIndexPair* key_index_pairs ): thread_idx(thread_idx), values_per_chunk(values_per_chunk), run_idx(run_idx), input_buffer(input_buffer), key_index_pairs(key_index_pairs) { this->fd = dup(fd); - // ring = std::make_unique(PREFETCH_DEPTH); write_bufs.resize(NUM_SLOTS); for (uint32_t i=0; i(PREFETCH_DEPTH); + ring = std::make_unique(NUM_SLOTS); std::queue slots; for (uint32_t i=0; i &run, void *sorted_values, std::vector &key_index_pairs, int run_id); - /** Same as generate_run_for_merge_sort but assumes key_index_pairs already filled (e.g. by Task1). */ - void generate_run_for_merge_sort_after_keys(std::vector &run, - void *sorted_values, std::vector &key_index_pairs, int run_id); - void write_back_values_post_merge(std::vector &fds, std::vector> &tasks, std::vector> &key_index_pairs); @@ -135,20 +131,28 @@ class Sorter { void sort(); - void print_timing_stats() { + void print_timing_stats_sync() { spdlog::info("Read input: {} ms", timing_info.input_read); - spdlog::info("Read merge phase: {} ms", timing_info.merge_read); - spdlog::info("Write sorted runs to disk: {} ms", timing_info.intermediate_write); - spdlog::info("Write output to disk: {} ms", timing_info.output_write); - spdlog::info("Sorting time: {} ms", timing_info.sort_time); spdlog::info("Key-value separation: {} ms", timing_info.value_separation); + spdlog::info("Sorting time: {} ms", timing_info.sort_time); spdlog::info("Value write back (for one-pass sort): {} ms", timing_info.value_write_back); - spdlog::info("Creation of intermediate value runs: {} ms", timing_info.create_intermediate_value_runs); + spdlog::info("Creation of intermediate value runs (for merge sort): {} ms", timing_info.create_intermediate_value_runs); + spdlog::info("Write sorted runs to disk: {} ms", timing_info.intermediate_write); + spdlog::info("Read merge phase: {} ms", timing_info.merge_read); spdlog::info("In-memory merge: {} ms", timing_info.merge_in_memory); + spdlog::info("Write output to disk: {} ms", timing_info.output_write); spdlog::info("Write back values (after merge sort): {} ms", timing_info.value_write_back_post_merge); + } + + void print_timing_stats() { if (config.use_async) { - spdlog::info("Async read and extract keys: {} ms", timing_info.async_read_and_extract_keys); - spdlog::info("Async value writer post sort: {} ms", timing_info.async_value_writer_post_sort); + spdlog::info("Read and extract keys: {} ms", timing_info.async_read_and_extract_keys); + spdlog::info("Sorting time: {} ms", timing_info.sort_time); + spdlog::info("Accumulate values and write intermediate runs to disk: {} ms", timing_info.async_value_writer_post_sort); + spdlog::info("In-memory merge: {} ms", timing_info.merge_in_memory); + spdlog::info("Write back values (after merge sort): {} ms", timing_info.value_write_back_post_merge); + } else { + print_timing_stats_sync(); } } }; @@ -384,27 +388,6 @@ void Sorter::generate_run_for_merge_sort( timing_info.create_intermediate_value_runs += std::chrono::duration_cast(end - start).count() / 1000.0f; } -template -void Sorter::generate_run_for_merge_sort_after_keys( - std::vector &run, - void *sorted_values, std::vector &key_index_pairs, int run_id) { - assert(config.separate_values); - assert((run.size() * sizeof(RecordType)) % 64 == 0); - assert(key_index_pairs.size() == run.size()); - - in_place_sort(key_index_pairs); - - auto start = std::chrono::high_resolution_clock::now(); - #pragma omp parallel for num_threads(config.num_threads) - for (int i=0; i(end - start).count() / 1000.0f; -} - template void Sorter::sort() { // Ensure that elements don't span across on-disk blocks From 9fd7f4853ff880a35f75f1f23dc1fb275d63e7ca Mon Sep 17 00:00:00 2001 From: Proteet Paul Date: Wed, 11 Feb 2026 22:24:05 -0600 Subject: [PATCH 08/11] Track IO processing time during async stages --- src/async_stages.h | 56 +++++++++++++++++++++++++++++++++++++++++----- src/sorter.h | 13 +++++++++++ 2 files changed, 63 insertions(+), 6 deletions(-) diff --git a/src/async_stages.h b/src/async_stages.h index 50b00ae..6569e68 100644 --- a/src/async_stages.h +++ b/src/async_stages.h @@ -86,6 +86,8 @@ void read_run_and_extract_keys(int fd, uint64_t thread_id, uint64_t run_size_byt using KeyIndexPair = KeyValuePair; constexpr uint32_t ELEM_SIZE = sizeof(RecordType); + uint64_t io_processing_time = 0; + assert(Config::BLOCK_SIZE_ALIGN % ELEM_SIZE == 0); assert(run_size_bytes % ELEM_SIZE == 0); @@ -119,6 +121,7 @@ void read_run_and_extract_keys(int fd, uint64_t thread_id, uint64_t run_size_byt free_slots--; chunk_id++; } + auto io_start = std::chrono::high_resolution_clock::now(); ring.submit_and_wait(1); struct io_uring_cqe* cqe = nullptr; @@ -136,6 +139,9 @@ void read_run_and_extract_keys(int fd, uint64_t thread_id, uint64_t run_size_byt continue; } free_slots++; + auto io_end = std::chrono::high_resolution_clock::now(); + + io_processing_time += std::chrono::duration_cast(io_end - io_start).count(); uint64_t done_chunk_offset = chunk_id * DEFAULT_READ_CHUNK_BYTES; uint64_t done_chunk_bytes = std::min(DEFAULT_READ_CHUNK_BYTES, run_size_bytes - done_chunk_offset); @@ -151,9 +157,30 @@ void read_run_and_extract_keys(int fd, uint64_t thread_id, uint64_t run_size_byt auto end = std::chrono::high_resolution_clock::now(); auto time_elapsed = std::chrono::duration_cast(end - start).count() / 1000.0f; - spdlog::debug("Run {}: read+extract {} ms, {} chunks", thread_id, time_elapsed, num_chunks); + spdlog::debug("Run {}: read+extract {} ms, {} chunks, io_processing time: {} ms", + thread_id, time_elapsed, num_chunks, io_processing_time/1000.0f); } +// template +// class InputReader { +// using KeyIndexPair = KeyValuePair; +// static constexpr uint32_t ELEM_SIZE = sizeof(RecordType); + +// int fd; +// int thread_idx; +// RecordType* records; +// KeyIndexPair* key_index_pairs; + +// public: +// explicit InputReader(int fd, int thread_idx, RecordType *records, KeyIndexPair *key_index_pairs) +// : fd(fd), thread_idx(thread_idx), records(records), key_index_pairs(key_index_pairs) { + +// } +// void run() { + +// } +// }; + /** * Accumulates values into in-memory buffers and writes them out in batches. This ensures overlap between cpu and io, while keeping the memory footprint low. */ @@ -182,12 +209,15 @@ class ValueWriterPostSort { static constexpr uint32_t BATCH_SIZE = 1; static constexpr uint64_t WRITE_IO_BYTES = RecordType::VALUE_LENGTH * io_uring_utils::BLOCK_ALIGN; -public: +public: + uint64_t io_processing_time_us; + explicit ValueWriterPostSort(int fd, int thread_idx, uint64_t values_per_chunk, uint8_t* input_buffer, int run_idx, KeyIndexPair* key_index_pairs ): thread_idx(thread_idx), values_per_chunk(values_per_chunk), - run_idx(run_idx), input_buffer(input_buffer), key_index_pairs(key_index_pairs) { + run_idx(run_idx), input_buffer(input_buffer), key_index_pairs(key_index_pairs), + io_processing_time_us(0ll) { this->fd = dup(fd); write_bufs.resize(NUM_SLOTS); for (uint32_t i=0; i= BATCH_SIZE || (next_write == num_writes); if (should_submit) { int nr = (next_write == num_writes) ? static_cast(num_writes - completed) : slots.empty(); @@ -270,6 +301,8 @@ class ValueWriterPostSort { ring->mark_cqe_seen(cqe); } } + auto io_end = std::chrono::high_resolution_clock::now(); + io_processing_time_us += std::chrono::duration_cast(io_end - io_start).count(); } spdlog::debug("Done writing out values post sort"); } @@ -397,14 +430,15 @@ class ValueWriterPostMerge { std::vector> readers; -public: static constexpr uint64_t NUM_SLOTS = PREFETCH_DEPTH * 4; static constexpr uint32_t BATCH_SIZE = 1; +public: + uint64_t io_processing_time_us; explicit ValueWriterPostMerge(MergeTask *task, int out_fd, std::vector &in_fds, std::vector &start_ptrs) - : task(task) { + : task(task), io_processing_time_us(0ll) { assert(in_fds.size() == start_ptrs.size()); assert(in_fds.size() > 0 && "in_fds must not be empty"); assert(WRITE_IO_BYTES % RecordType::VALUE_LENGTH == 0); @@ -483,7 +517,10 @@ class ValueWriterPostMerge { while (records_emitted < task->total_records_sorted) { while (slots.empty()) { - poll_completions(); + auto io_start = std::chrono::high_resolution_clock::now(); + poll_completions(); + auto io_end = std::chrono::high_resolution_clock::now(); + io_processing_time_us += std::chrono::duration_cast(io_end - io_start).count(); } uint32_t slot = slots.front(); slots.pop(); @@ -500,6 +537,7 @@ class ValueWriterPostMerge { void *value = readers[stream_id]->get_next_value_fast(); if (value == nullptr) [[unlikely]] { + auto io_start = std::chrono::high_resolution_clock::now(); // Having another call here just so that we can submit an io pre-emptively. This is necessary when switching from // one buffer to the next. If the IO to the next buffer is complete, it won't refill the earlier buffer. value = readers[stream_id]->get_next_value(); @@ -514,15 +552,21 @@ class ValueWriterPostMerge { poll_completions(); } ring->submit_and_wait(0); + auto io_end = std::chrono::high_resolution_clock::now(); + io_processing_time_us += std::chrono::duration_cast(io_end - io_start).count(); value = readers[stream_id]->get_next_value(); } assert(value != nullptr && "Value is null"); std::memcpy(&out_buf[i].value, value, RecordType::VALUE_LENGTH); sorted_keys++; } + auto io_start = std::chrono::high_resolution_clock::now(); ring->prepare_write(out_fd, write_bufs[slot], num_records_in_batch * sizeof(RecordType), out_file_offset, (uint64_t)slot << 32); ring->submit_and_wait(0); + + auto io_end = std::chrono::high_resolution_clock::now(); + io_processing_time_us += std::chrono::duration_cast(io_end - io_start).count(); out_file_offset += num_records_in_batch * sizeof(RecordType); records_emitted += num_records_in_batch; } diff --git a/src/sorter.h b/src/sorter.h index da61808..ea10632 100644 --- a/src/sorter.h +++ b/src/sorter.h @@ -474,6 +474,13 @@ void Sorter::sort() { } auto end_writers = std::chrono::high_resolution_clock::now(); timing_info.async_value_writer_post_sort += std::chrono::duration_cast(end_writers - start_writers).count() / 1000.0f; + + auto total_io_processing_time = 0ll; + for (int j=0; jio_processing_time_us; + } + + spdlog::info("Average IO processing time in post-sort step: {} ms", total_io_processing_time / (1000.0f * config.num_threads)); fds.push_back(write_fd); spdlog::debug("Done async read and key extraction"); } @@ -664,6 +671,12 @@ void Sorter::write_back_values_post_merge_async(std::vector &fd for (int i=0; irun(); } + + auto total_io_processing_time = 0ll; + for (int i=0; iio_processing_time_us; + } auto end = std::chrono::high_resolution_clock::now(); + spdlog::info("Average IO processing time in post-merge step: {} ms", total_io_processing_time / (1000.0f * config.num_threads)); timing_info.value_write_back_post_merge = std::chrono::duration_cast(end - start).count() / 1000.0f; } \ No newline at end of file From cee06600799573d285870167d71cf43395ea8813 Mon Sep 17 00:00:00 2001 From: Proteet Paul Date: Wed, 11 Feb 2026 23:22:57 -0600 Subject: [PATCH 09/11] Fix bug in post-merge step --- src/async_stages.h | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/async_stages.h b/src/async_stages.h index 6569e68..3ef0917 100644 --- a/src/async_stages.h +++ b/src/async_stages.h @@ -334,9 +334,15 @@ class AsyncValueReader { return states[cur_buf_idx] != BufState::IoCompleted; } - AsyncValueReader(int fd, uint64_t start_offset, uint64_t value_length, uint64_t read_chunk_size, int reader_id): - fd(fd), file_offset(start_offset), chunk_offset(0ll), value_length_bytes(value_length), reader_id(reader_id), - read_chunk_size(read_chunk_size) { + /** logical_start_offset is the byte offset in the file where the logical stream starts. + * For O_DIRECT, the actual read offset is aligned down to BLOCK_ALIGN; the first buffer + * skips (logical_start_offset % BLOCK_ALIGN) bytes. */ + AsyncValueReader(int fd, uint64_t logical_start_offset, uint64_t value_length, uint64_t read_chunk_size, int reader_id): + fd(fd), value_length_bytes(value_length), reader_id(reader_id), read_chunk_size(read_chunk_size) { + uint64_t aligned_offset = (logical_start_offset / io_uring_utils::BLOCK_ALIGN) * io_uring_utils::BLOCK_ALIGN; + uint64_t initial_skip_bytes = logical_start_offset - aligned_offset; + file_offset = aligned_offset; + chunk_offset = initial_skip_bytes; for (int i=0; i<2; i++) { int ret = posix_memalign(&ptr[i], 4096, read_chunk_size); assert(ret == 0); @@ -447,8 +453,9 @@ class ValueWriterPostMerge { for (int i=0; istart_ptrs[i]) * RecordType::VALUE_LENGTH; - auto reader = std::make_unique(fd, file_offset, RecordType::VALUE_LENGTH, READ_IO_CHUNK, i); + uint64_t logical_start = (start_ptrs[i] - task->start_ptrs[i]) * RecordType::VALUE_LENGTH; + auto reader = std::make_unique(fd, logical_start, RecordType::VALUE_LENGTH, + READ_IO_CHUNK, i); readers.push_back(std::move(reader)); } From 574e12a11eb0c7fa39a4247ce1677def131619f4 Mon Sep 17 00:00:00 2001 From: Proteet Paul Date: Thu, 12 Feb 2026 07:45:01 -0600 Subject: [PATCH 10/11] Fix bug and add script for plotting disk bandwidth --- src/async_stages.h | 2 +- tools/plot_disk_bw.py | 199 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 200 insertions(+), 1 deletion(-) create mode 100644 tools/plot_disk_bw.py diff --git a/src/async_stages.h b/src/async_stages.h index 3ef0917..18dff09 100644 --- a/src/async_stages.h +++ b/src/async_stages.h @@ -453,7 +453,7 @@ class ValueWriterPostMerge { for (int i=0; istart_ptrs[i]) * RecordType::VALUE_LENGTH; + uint64_t logical_start = (task->start_ptrs[i] - start_ptrs[i]) * RecordType::VALUE_LENGTH; auto reader = std::make_unique(fd, logical_start, RecordType::VALUE_LENGTH, READ_IO_CHUNK, i); readers.push_back(std::move(reader)); diff --git a/tools/plot_disk_bw.py b/tools/plot_disk_bw.py new file mode 100644 index 0000000..a4facf3 --- /dev/null +++ b/tools/plot_disk_bw.py @@ -0,0 +1,199 @@ +import argparse +import sys +import numpy as np +import matplotlib.pyplot as plt +from collections import defaultdict + +""" +Biosnoop output is in the following format: +TIME(s) COMM PID DISK T SECTOR BYTES QUE(ms) LAT(ms) +""" +def parse_biosnoop(file, pid): + latencies = [] + io_sizes = [] + trace_data = [] # Store trace data: (sector, io_size, queued_time, latency) + io_events = [] # Store IO events with time: (time_s, io_type, bytes) + + for line in file: + line = line.strip() + if not line or line.startswith("TIME(") or line.startswith("--"): + continue + parts = line.split() + if len(parts) < 8: + continue + try: + time_s = float(parts[0]) # TIME column (in seconds) + line_pid = int(parts[2]) + comm = parts[1] + io_type = parts[4] # T column: R for read, W for write + que_ms = float(parts[-2]) # QUE column + lat_ms = float(parts[-1]) # LAT column + io_size = int(parts[-3]) # BYTES column + # Extract sector number - typically in the 6th column + sector = int(parts[5]) + except (ValueError, IndexError): + continue + if line_pid == pid: + if timing_type == "queued": + latencies.append(que_ms) + elif timing_type == "block_device": + latencies.append(lat_ms) + elif timing_type == "total": + latencies.append(lat_ms + que_ms) + io_sizes.append(io_size) + # Store trace data: sector, io_size, queued_time, latency + trace_data.append((sector, io_size, que_ms, lat_ms)) + # Store IO event with time and type for bandwidth calculation + io_events.append((time_s, io_type, io_size)) + return latencies, io_sizes, trace_data, io_events + +def print_percentiles(latencies): + if not latencies: + print(f"No I/O events found.") + return + percentiles = [50, 70, 90, 99] + values = np.percentile(latencies, percentiles) + + for p, v in zip(percentiles, values): + print(f" p{p}: {v:.3f}") + print(f"Mean: {np.mean(latencies)}") + print(f" count: {len(latencies)} events") + +def calculate_bandwidth(io_events, interval_ms): + """ + Calculate read and write bandwidth for each time interval. + + Args: + io_events: List of tuples (time_s, io_type, bytes) + interval_ms: Time interval in milliseconds + + Returns: + Tuple of (time_intervals, read_bandwidths, write_bandwidths) + Bandwidths are in MB/s + """ + if not io_events: + return [], [], [] + + # Convert interval from ms to seconds + interval_s = interval_ms / 1000.0 + + # Find time range + times = [event[0] for event in io_events] + min_time = min(times) + max_time = max(times) + + # Create time bins + num_intervals = int(np.ceil((max_time - min_time) / interval_s)) + time_intervals = [] + read_bytes = defaultdict(float) + write_bytes = defaultdict(float) + + # Group IO events by time interval + for time_s, io_type, bytes_count in io_events: + # Calculate which interval this event belongs to + interval_idx = int((time_s - min_time) / interval_s) + interval_start = min_time + interval_idx * interval_s + + if interval_start not in time_intervals: + time_intervals.append(interval_start) + + # Accumulate bytes by type + if io_type.upper() == 'R': + read_bytes[interval_start] += bytes_count + elif io_type.upper() == 'W': + write_bytes[interval_start] += bytes_count + + # Sort time intervals + time_intervals = sorted(set(time_intervals)) + + # Calculate bandwidth in MB/s for each interval + read_bandwidths = [] + write_bandwidths = [] + + for interval_start in time_intervals: + read_bw = (read_bytes[interval_start] / (1024 * 1024)) / interval_s # MB/s + write_bw = (write_bytes[interval_start] / (1024 * 1024)) / interval_s # MB/s + read_bandwidths.append(read_bw) + write_bandwidths.append(write_bw) + + return time_intervals, read_bandwidths, write_bandwidths + +def plot_bandwidth(time_intervals, read_bandwidths, write_bandwidths, interval_ms, output_file=None, pid=None, input_file=None): + """ + Plot read and write bandwidth over time. + + Args: + time_intervals: List of interval start times (in seconds) + read_bandwidths: List of read bandwidths (in MB/s) + write_bandwidths: List of write bandwidths (in MB/s) + interval_ms: Time interval in milliseconds (for title) + output_file: Output file path for the plot (if None, generates default name) + pid: Process ID (for default filename generation) + input_file: Input file path (for default filename generation) + """ + if not time_intervals: + print("No data to plot.") + return + + plt.figure(figsize=(12, 6)) + + # Convert time intervals to relative time (seconds from start) + start_time = time_intervals[0] + relative_times = [(t - start_time) * 1000 for t in time_intervals] # Convert to ms + + plt.plot(relative_times, read_bandwidths, label='Read', color='blue', linewidth=2) + plt.plot(relative_times, write_bandwidths, label='Write', color='red', linewidth=2) + + plt.xlabel('Time (ms)', fontsize=12) + plt.ylabel('Bandwidth (MB/s)', fontsize=12) + plt.title(f'Disk I/O Bandwidth Over Time (Interval: {interval_ms} ms)', fontsize=14) + plt.legend(fontsize=11) + plt.grid(True, alpha=0.3) + + plt.tight_layout() + + # Generate default filename if not provided + if output_file is None: + if input_file: + base_name = input_file.rsplit('.', 1)[0] if '.' in input_file else input_file + output_file = f"{base_name}_bandwidth_pid{pid}_interval{interval_ms}ms.png" + else: + output_file = f"bandwidth_pid{pid}_interval{interval_ms}ms.png" + + plt.savefig(output_file, dpi=300, bbox_inches='tight') + print(f"Plot saved to {output_file}") + plt.close() + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Parse biosnoop-bpfcc output and plot bandwidth over time.") + parser.add_argument("--pid", type=int, required=True, help="PID to filter") + parser.add_argument("--file", type=str, help="Path to biosnoop output file (default: stdin)") + parser.add_argument("--interval", type=float, required=True, help="Time interval in milliseconds for bandwidth calculation") + parser.add_argument("--output", type=str, help="Output file path for the plot (default: auto-generated based on input file and PID)") + parser.add_argument("--timing-type", type=str, default="total", choices=["queued", "block_device", "total"], + help="Type of latency to calculate (default: total)") + args = parser.parse_args() + + global timing_type + timing_type = args.timing_type + + if args.file: + with open(args.file, "r") as f: + latencies, io_sizes, trace_data, io_events = parse_biosnoop(f, args.pid) + else: + latencies, io_sizes, trace_data, io_events = parse_biosnoop(sys.stdin, args.pid) + + # Calculate and plot bandwidth + if io_events: + time_intervals, read_bandwidths, write_bandwidths = calculate_bandwidth(io_events, args.interval) + plot_bandwidth(time_intervals, read_bandwidths, write_bandwidths, args.interval, args.output, args.pid, args.file) + else: + print("No I/O events found for plotting.") + + # Print latency statistics if available + if latencies: + print(f"\n{args.timing_type.replace('_', ' ').title()} latency percentiles (ms):") + print_percentiles(latencies) + + print(f"\nIO sizes percentiles:") + print_percentiles(io_sizes) \ No newline at end of file From 81629c66c4d7defb52f84621394979f639f48d7f Mon Sep 17 00:00:00 2001 From: Proteet Paul Date: Thu, 12 Feb 2026 16:05:46 -0600 Subject: [PATCH 11/11] Fix in-place sort --- bpftrace.md | 63 ++++++++++++++++ src/sorter.h | 44 +++++------ tools/plot_bpftrace_bw.py | 151 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 232 insertions(+), 26 deletions(-) create mode 100644 bpftrace.md create mode 100644 tools/plot_bpftrace_bw.py diff --git a/bpftrace.md b/bpftrace.md new file mode 100644 index 0000000..a8ea846 --- /dev/null +++ b/bpftrace.md @@ -0,0 +1,63 @@ +# bpftrace: Per-block device read/write byte totals + +```bash +sudo bpftrace -e ' +tracepoint:block:block_rq_complete +{ + $bytes = args->nr_sector * 512; + /* Cast the string to a char pointer and dereference to get the first byte */ + $c = *(uint8 *)args->rwbs; + $dev = args->dev; + + @read[$dev] += ($c == 82 ? $bytes : 0); /* 82 is "R" */ + @write[$dev] += ($c == 87 ? $bytes : 0); /* 87 is "W" */ + @total[$dev] += $bytes; + @seen[$dev] = 1; +} +' +``` + +# bpftrace: Block IO bandwidth tracking at 10ms intervals + +```bash +sudo bpftrace -e ' +tracepoint:block:block_rq_complete +{ + $bytes = args->nr_sector * 512; + /* Cast the string to a char pointer and dereference to get the first byte */ + $c = *(uint8 *)args->rwbs; + + /* Accumulate bytes for reads and writes */ + @read_bytes += ($c == 82 ? $bytes : 0); + @write_bytes += ($c == 87 ? $bytes : 0); +} + +interval:ms:10 +{ + /* Calculate bandwidth in MB/s using integer arithmetic */ + /* bytes in 10ms -> bytes_per_second = bytes * 100 */ + /* MB_per_second = (bytes * 100) / 1048576 */ + $read_bps = @read_bytes * 100; /* bytes per second */ + $write_bps = @write_bytes * 100; + $read_mbps = $read_bps / 1048576; /* MB per second (integer division) */ + $write_mbps = $write_bps / 1048576; + + /* Time in seconds (elapsed is in nanoseconds) */ + $time_sec = elapsed / 1000000000; + $time_ms_remainder = elapsed % 1000000000; + $time_ms = $time_ms_remainder / 1000000; + + printf("%d.%03d: Read: %d MB/s, Write: %d MB/s\n", + $time_sec, $time_ms, $read_mbps, $write_mbps); + + /* Reset counters for next interval */ + @read_bytes = 0; + @write_bytes = 0; +} + +END +{ + printf("\nTracing complete.\n"); +} +' +``` diff --git a/src/sorter.h b/src/sorter.h index ea10632..3bb6723 100644 --- a/src/sorter.h +++ b/src/sorter.h @@ -103,14 +103,13 @@ class Sorter { void in_place_std_sort(std::vector> &vec); void generate_key_index_pairs(std::vector &vec, - std::vector &key_index_pairs, - void* value_buffer, bool separate_values); + std::vector &key_index_pairs); void write_back_values(std::vector &original, std::vector &key_index_pairs, - void *value_buffer); + std::vector &result); - void sort_single_run(std::vector &run); + void sort_single_run(std::vector &run, std::vector &output); void generate_run_for_merge_sort(std::vector &run, void *sorted_values, std::vector &key_index_pairs, int run_id); @@ -159,7 +158,7 @@ class Sorter { template void Sorter::generate_key_index_pairs(std::vector &vec, - std::vector &key_index_pairs, void* value_buffer, bool separate_values) { + std::vector &key_index_pairs) { spdlog::debug("Start separating values from keys"); auto start = std::chrono::high_resolution_clock::now(); #pragma omp parallel for num_threads(config.num_threads) @@ -167,10 +166,6 @@ void Sorter::generate_key_index_pairs(std::vector &vec, key_index_pairs[i].key = __builtin_bswap64(vec[i].key); key_index_pairs[i].set_value(&i); uint64_t offset = i * RecordType::VALUE_LENGTH; - - if (separate_values) { - std::memcpy((uint8_t*)value_buffer + offset, &vec[i].value, RecordType::VALUE_LENGTH); - } } auto end = std::chrono::high_resolution_clock::now(); timing_info.value_separation += std::chrono::duration_cast(end - start).count() / 1000.0f; @@ -181,18 +176,18 @@ void Sorter::generate_key_index_pairs(std::vector &vec, template void Sorter::write_back_values(std::vector &original, std::vector &key_index_pairs, - void *value_buffer) { + std::vector &result) { int miss_fd = init_perf_counter(PERF_COUNT_HW_CACHE_MISSES); int ref_fd = init_perf_counter(PERF_COUNT_HW_CACHE_REFERENCES); spdlog::debug("Start writing back values"); auto start = std::chrono::high_resolution_clock::now(); - // TODO(): Maybe unroll loop?? + #pragma omp parallel for num_threads(config.num_threads) for (uint64_t i=0; i(end - start).count() / 1000.0f; @@ -346,20 +341,15 @@ void Sorter::sort_independently(std::vector -void Sorter::sort_single_run(std::vector &run) { +void Sorter::sort_single_run(std::vector &run, std::vector &output) { if (config.separate_values) { std::vector key_index_pairs(run.size()); - void *value_buffer; - int ret = posix_memalign(&value_buffer, 64, RecordType::VALUE_LENGTH * run.size()); - assert(ret == 0); - memset(value_buffer, 0, RecordType::VALUE_LENGTH * run.size()); - generate_key_index_pairs(run, key_index_pairs, value_buffer, true); + generate_key_index_pairs(run, key_index_pairs); in_place_sort(key_index_pairs); - write_back_values(run, key_index_pairs, value_buffer); + write_back_values(run, key_index_pairs, output); - spdlog::info("is sorted: {}", std::is_sorted(run.begin(), run.end())); - free(value_buffer); + spdlog::info("is sorted: {}", std::is_sorted(output.begin(), output.end())); } else { in_place_sort(run); } @@ -374,7 +364,7 @@ void Sorter::generate_run_for_merge_sort( key_index_pairs.resize(run.size()); memset(key_index_pairs.data(), 0, run.size() * sizeof(KeyIndexPair)); - generate_key_index_pairs(run, key_index_pairs, nullptr, false); + generate_key_index_pairs(run, key_index_pairs); in_place_sort(key_index_pairs); auto start = std::chrono::high_resolution_clock::now(); @@ -399,8 +389,10 @@ void Sorter::sort() { uint64_t num_runs = config.num_runs(); if (num_runs == 1) { auto v = read_input_chunk(0); - sort_single_run(v); - write_output_chunk(v.data(), config.file_size_bytes); + std::vector output(v.size()); + memset((void*) output.data(), 0, v.size() * ELEM_SIZE); + sort_single_run(v, output); + write_output_chunk(output.data(), config.file_size_bytes); return; } diff --git a/tools/plot_bpftrace_bw.py b/tools/plot_bpftrace_bw.py new file mode 100644 index 0000000..09bb453 --- /dev/null +++ b/tools/plot_bpftrace_bw.py @@ -0,0 +1,151 @@ +import argparse +import sys +import re +import matplotlib.pyplot as plt + +""" +Parse bpftrace output and plot read/write bandwidth over time. + +bpftrace output format: +.: Read: MB/s, Write: MB/s +Example: 1.234: Read: 100 MB/s, Write: 50 MB/s +""" + +def parse_bpftrace(file): + """ + Parse bpftrace bandwidth output. + + Args: + file: File object or stdin + + Returns: + Tuple of (times, read_bandwidths, write_bandwidths) + times are in seconds, bandwidths are in MB/s + """ + times = [] + read_bandwidths = [] + write_bandwidths = [] + + # Pattern to match: "1.234: Read: 100 MB/s, Write: 50 MB/s" + # bpftrace uses %d.%03d format, so milliseconds are always 3 digits + pattern = r'(\d+)\.(\d{3}):\s+Read:\s+(\d+)\s+MB/s,\s+Write:\s+(\d+)\s+MB/s' + + for line in file: + line = line.strip() + if not line or line.startswith("Tracing complete"): + continue + + match = re.match(pattern, line) + if match: + time_sec = int(match.group(1)) + time_ms = int(match.group(2)) + read_bw = int(match.group(3)) + write_bw = int(match.group(4)) + + # Convert to seconds (e.g., 1.234 -> 1.234 seconds) + time_total = time_sec + time_ms / 1000.0 + + times.append(time_total) + read_bandwidths.append(read_bw) + write_bandwidths.append(write_bw) + + return times, read_bandwidths, write_bandwidths + +def plot_bandwidth(times, read_bandwidths, write_bandwidths, output_file=None, input_file=None, start_time=None, end_time=None): + """ + Plot read and write bandwidth over time. + + Args: + times: List of timestamps (in seconds) + read_bandwidths: List of read bandwidths (in MB/s) + write_bandwidths: List of write bandwidths (in MB/s) + output_file: Output file path for the plot (if None, generates default name) + input_file: Input file path (for default filename generation) + start_time: Start time in seconds to clip the plot (None = no clipping) + end_time: End time in seconds to clip the plot (None = no clipping) + """ + if not times: + print("No data to plot.") + return + + # Filter data based on start_time and end_time + filtered_times = [] + filtered_read = [] + filtered_write = [] + + for i, t in enumerate(times): + if start_time is not None and t < start_time: + continue + if end_time is not None and t > end_time: + continue + filtered_times.append(t) + filtered_read.append(read_bandwidths[i]) + filtered_write.append(write_bandwidths[i]) + + if not filtered_times: + print("No data to plot after clipping.") + return + + # Calculate total bandwidth (read + write) + filtered_total = [r + w for r, w in zip(filtered_read, filtered_write)] + + plt.figure(figsize=(12, 6)) + + # Convert times to relative time (milliseconds from start) + plot_start_time = filtered_times[0] + relative_times = [(t - plot_start_time) * 1000 for t in filtered_times] # Convert to ms + + # Plot total bandwidth first with dashed line and lower z-order so read/write are on top + plt.plot(relative_times, filtered_total, label='Total', color='green', linewidth=1, + linestyle='--', alpha=0.7, zorder=1) + # Plot read and write on top with higher z-order + plt.plot(relative_times, filtered_read, label='Read', color='blue', linewidth=1, zorder=2) + plt.plot(relative_times, filtered_write, label='Write', color='red', linewidth=1, zorder=2) + + plt.xlabel('Time (ms)', fontsize=12) + plt.ylabel('Bandwidth (MB/s)', fontsize=12) + plt.title('Disk I/O Bandwidth Over Time (from bpftrace)', fontsize=14) + plt.legend(fontsize=11) + plt.grid(True, alpha=0.3) + + plt.tight_layout() + + # Generate default filename if not provided + if output_file is None: + if input_file: + base_name = input_file.rsplit('.', 1)[0] if '.' in input_file else input_file + output_file = f"{base_name}_bandwidth.png" + else: + output_file = "bpftrace_bandwidth.png" + + plt.savefig(output_file, dpi=300, bbox_inches='tight') + print(f"Plot saved to {output_file}") + plt.close() + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Parse bpftrace bandwidth output and plot read/write bandwidth over time.") + parser.add_argument("--file", type=str, help="Path to bpftrace output file (default: stdin)") + parser.add_argument("--output", type=str, help="Output file path for the plot (default: auto-generated based on input file)") + parser.add_argument("--start", type=float, help="Start time in seconds to clip the plot (default: no clipping)") + parser.add_argument("--end", type=float, help="End time in seconds to clip the plot (default: no clipping)") + args = parser.parse_args() + + if args.file: + with open(args.file, "r") as f: + times, read_bandwidths, write_bandwidths = parse_bpftrace(f) + else: + times, read_bandwidths, write_bandwidths = parse_bpftrace(sys.stdin) + + if times: + original_count = len(times) + plot_bandwidth(times, read_bandwidths, write_bandwidths, args.output, args.file, args.start, args.end) + if args.start is not None or args.end is not None: + # Count filtered points + filtered_count = sum(1 for t in times + if (args.start is None or t >= args.start) and + (args.end is None or t <= args.end)) + print(f"Processed {original_count} data points ({filtered_count} after clipping)") + else: + print(f"Processed {original_count} data points") + else: + print("No bandwidth data found in input.")