diff --git a/bpftrace.md b/bpftrace.md new file mode 100644 index 0000000..a8ea846 --- /dev/null +++ b/bpftrace.md @@ -0,0 +1,63 @@ +# bpftrace: Per-block device read/write byte totals + +```bash +sudo bpftrace -e ' +tracepoint:block:block_rq_complete +{ + $bytes = args->nr_sector * 512; + /* Cast the string to a char pointer and dereference to get the first byte */ + $c = *(uint8 *)args->rwbs; + $dev = args->dev; + + @read[$dev] += ($c == 82 ? $bytes : 0); /* 82 is "R" */ + @write[$dev] += ($c == 87 ? $bytes : 0); /* 87 is "W" */ + @total[$dev] += $bytes; + @seen[$dev] = 1; +} +' +``` + +# bpftrace: Block IO bandwidth tracking at 10ms intervals + +```bash +sudo bpftrace -e ' +tracepoint:block:block_rq_complete +{ + $bytes = args->nr_sector * 512; + /* Cast the string to a char pointer and dereference to get the first byte */ + $c = *(uint8 *)args->rwbs; + + /* Accumulate bytes for reads and writes */ + @read_bytes += ($c == 82 ? $bytes : 0); + @write_bytes += ($c == 87 ? $bytes : 0); +} + +interval:ms:10 +{ + /* Calculate bandwidth in MB/s using integer arithmetic */ + /* bytes in 10ms -> bytes_per_second = bytes * 100 */ + /* MB_per_second = (bytes * 100) / 1048576 */ + $read_bps = @read_bytes * 100; /* bytes per second */ + $write_bps = @write_bytes * 100; + $read_mbps = $read_bps / 1048576; /* MB per second (integer division) */ + $write_mbps = $write_bps / 1048576; + + /* Time in seconds (elapsed is in nanoseconds) */ + $time_sec = elapsed / 1000000000; + $time_ms_remainder = elapsed % 1000000000; + $time_ms = $time_ms_remainder / 1000000; + + printf("%d.%03d: Read: %d MB/s, Write: %d MB/s\n", + $time_sec, $time_ms, $read_mbps, $write_mbps); + + /* Reset counters for next interval */ + @read_bytes = 0; + @write_bytes = 0; +} + +END +{ + printf("\nTracing complete.\n"); +} +' +``` diff --git a/run.sh b/run.sh index 4438543..295484f 100755 --- a/run.sh +++ b/run.sh @@ -13,6 +13,7 @@ PROFILE_OUTPUT="perf.data" FLAMEGRAPH_OUTPUT="flamegraph.svg" SEPARATE_VALUES=false ENABLE_MEMORY_PROFILING=false +USE_ASYNC_IO=false PCM_MEMORY="/users/proteet/pcm/build/bin/pcm-memory" # Function to show usage @@ -88,6 +89,10 @@ while [[ $# -gt 0 ]]; do FLAMEGRAPH_OUTPUT="$2" shift 2 ;; + --use-async) + USE_ASYNC_IO=true + shift + ;; --help|-h) show_usage exit 0 @@ -151,6 +156,10 @@ if [ "$SEPARATE_VALUES" = true ]; then CMD_ARGS="$CMD_ARGS --separate-values" fi +if [ "$USE_ASYNC_IO" = true ]; then + CMD_ARGS="$CMD_ARGS --use-async" +fi + echo "Running sorter with parameters:" echo " File size: $FILE_SIZE" echo " Key size: $KEY_SIZE" diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 656599c..ac7be30 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -20,12 +20,18 @@ endif() find_package(OpenMP REQUIRED) +find_library(LIBURING_LIBRARIES uring) +find_path(LIBURING_INCLUDE_DIRS liburing.h) + # Add executable add_executable(sorter main.cpp) # target_compile_options(sorter PRIVATE -fopenmp) target_link_libraries(sorter PUBLIC OpenMP::OpenMP_CXX) +target_include_directories(sorter PRIVATE ${LIBURING_INCLUDE_DIRS}) +target_link_libraries(sorter PRIVATE ${LIBURING_LIBRARIES}) + # Enable additional optimizations # target_compile_options(sorter PRIVATE -march=native -mtune=native) target_compile_options(sorter PRIVATE -mavx512f -mavx512vl) diff --git a/src/async_stages.h b/src/async_stages.h new file mode 100644 index 0000000..18dff09 --- /dev/null +++ b/src/async_stages.h @@ -0,0 +1,582 @@ +#pragma once + +/** + * Task 1: Fuse Phase 1 (read input) and Phase 2 (extract keys) using io_uring. + * + * Reads a run in small chunks via io_uring. When each chunk completes, keys + * are extracted into (key, index) pairs and the chunk is copied into the run + * buffer. Prefetching is done by submitting the next read as soon as a slot + * is free (queue depth controls in-flight reads). + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "config.h" +#include "merge.h" +#include "io_uring_utils.h" +#include "key_value_pair.h" +#include "spdlog/spdlog.h" + + +constexpr uint64_t DEFAULT_READ_CHUNK_BYTES = 512 * 1024; // 512 KB + +constexpr uint32_t PREFETCH_DEPTH = 4; + +inline uint64_t align_up_block(uint64_t size) { + const uint64_t mask = io_uring_utils::BLOCK_ALIGN - 1; + return (size + mask) & ~mask; +} + +/** + * Extract keys from a chunk of raw record bytes into key_index_pairs. + * Record layout: key (8 bytes) then value. Keys are byte-swapped to match + * generate_key_index_pairs. + */ +template +void extract_keys_from_chunk( + const RecordType* input_buf, + uint64_t chunk_id, + uint64_t num_records_in_chunk, + KeyValuePair* key_index_pairs +) { + constexpr uint32_t ELEM_SIZE = sizeof(RecordType); + static_assert(RecordType::KEY_LENGTH == 8, "Size of key should be 8 bytes"); + + const uint64_t num_records_per_full_chunk = DEFAULT_READ_CHUNK_BYTES / ELEM_SIZE; + uint64_t idx = chunk_id * num_records_per_full_chunk; + auto ptr = (RecordType*) input_buf + chunk_id * num_records_per_full_chunk; + + for (uint64_t i = 0; i < num_records_in_chunk; ++i) { + key_index_pairs[idx].key = __builtin_bswap64(ptr->key); + key_index_pairs[idx].set_value(&idx); + ptr++; + idx++; + } +} + +inline std::pair process_cqe(io_uring_cqe *cqe) { + int res = io_uring_utils::UringRing::cqe_result(cqe); + uint64_t user_data = io_uring_utils::UringRing::cqe_user_data(cqe); + return std::make_pair<>(user_data, res); +} + +// TODO(): Rewrite this as a class +/** + * Read one run from fd using io_uring in small chunks, and extract keys into + * key_index_pairs. Run data is written into run_buffer (must be at least + * run_size_bytes). key_index_pairs is resized and filled for the run. + * + * Uses a single io_uring ring with READ_QUEUE_DEPTH in-flight reads. + * read_chunk_bytes: size of each read chunk (will be aligned to BLOCK_ALIGN). + */ +template +void read_run_and_extract_keys(int fd, uint64_t thread_id, uint64_t run_size_bytes, + RecordType* records, + KeyValuePair* key_index_pairs +) { + using KeyIndexPair = KeyValuePair; + constexpr uint32_t ELEM_SIZE = sizeof(RecordType); + + uint64_t io_processing_time = 0; + + assert(Config::BLOCK_SIZE_ALIGN % ELEM_SIZE == 0); + assert(run_size_bytes % ELEM_SIZE == 0); + + uint64_t num_records = run_size_bytes / ELEM_SIZE; + uint64_t read_chunk_bytes = DEFAULT_READ_CHUNK_BYTES; + + uint64_t file_offset = thread_id * run_size_bytes; + uint64_t num_chunks = (run_size_bytes + read_chunk_bytes - 1) / read_chunk_bytes; + + io_uring_utils::UringRing ring(PREFETCH_DEPTH); + + auto start = std::chrono::high_resolution_clock::now(); + uint64_t chunk_id = 0; + uint64_t completed = 0; + uint32_t free_slots = PREFETCH_DEPTH; + + while (completed < num_chunks) { + while (chunk_id < num_chunks && free_slots > 0) { + uint64_t chunk_offset = chunk_id * DEFAULT_READ_CHUNK_BYTES; + auto *buf = records + chunk_offset / ELEM_SIZE; + uint64_t chunk_bytes = std::min(DEFAULT_READ_CHUNK_BYTES, run_size_bytes - chunk_offset); + uint64_t read_size = align_up_block(chunk_bytes); + + bool ok = ring.prepare_read(fd, (void*)buf, static_cast(read_size), + file_offset + chunk_offset, + chunk_id << 32); + if (!ok) { + spdlog::error("submit_read failed for chunk {}", chunk_id); + break; + } + free_slots--; + chunk_id++; + } + auto io_start = std::chrono::high_resolution_clock::now(); + ring.submit_and_wait(1); + + struct io_uring_cqe* cqe = nullptr; + if (!ring.wait_cqe(&cqe)) { + spdlog::error("wait_cqe failed"); + break; + } + + auto p = process_cqe(cqe); + uint32_t chunk_id = p.first >> 32; + if (p.second < 0) { + spdlog::error("read chunk {} failed: {}", chunk_id, p.second); + ring.mark_cqe_seen(cqe); + completed++; + continue; + } + free_slots++; + auto io_end = std::chrono::high_resolution_clock::now(); + + io_processing_time += std::chrono::duration_cast(io_end - io_start).count(); + + uint64_t done_chunk_offset = chunk_id * DEFAULT_READ_CHUNK_BYTES; + uint64_t done_chunk_bytes = std::min(DEFAULT_READ_CHUNK_BYTES, run_size_bytes - done_chunk_offset); + uint64_t num_records_chunk = done_chunk_bytes / ELEM_SIZE; + + extract_keys_from_chunk( + records, chunk_id, num_records_chunk, key_index_pairs + ); + ring.mark_cqe_seen(cqe); + completed++; + } + + auto end = std::chrono::high_resolution_clock::now(); + auto time_elapsed = std::chrono::duration_cast(end - start).count() / 1000.0f; + + spdlog::debug("Run {}: read+extract {} ms, {} chunks, io_processing time: {} ms", + thread_id, time_elapsed, num_chunks, io_processing_time/1000.0f); +} + +// template +// class InputReader { +// using KeyIndexPair = KeyValuePair; +// static constexpr uint32_t ELEM_SIZE = sizeof(RecordType); + +// int fd; +// int thread_idx; +// RecordType* records; +// KeyIndexPair* key_index_pairs; + +// public: +// explicit InputReader(int fd, int thread_idx, RecordType *records, KeyIndexPair *key_index_pairs) +// : fd(fd), thread_idx(thread_idx), records(records), key_index_pairs(key_index_pairs) { + +// } +// void run() { + +// } +// }; + +/** +* Accumulates values into in-memory buffers and writes them out in batches. This ensures overlap between cpu and io, while keeping the memory footprint low. +*/ +template +class ValueWriterPostSort { + using KeyIndexPair = KeyValuePair; + + int fd; // Intermediate file containing values + + int thread_idx; // Each thread writes to a non-overlapping portion of the file + + uint64_t values_per_chunk; // Number of values per thread + + uint8_t *input_buffer; // Buffer containing key-value pairs + + int run_idx; // This writer is for the i'th sorted run + + std::vector write_bufs; // Set of pre-allocated buffers used for writing out + + std::unique_ptr ring; + + KeyIndexPair* key_index_pairs; // Sorted key-index pairs + + static constexpr uint32_t NUM_SLOTS = 16; + + static constexpr uint32_t BATCH_SIZE = 1; + + static constexpr uint64_t WRITE_IO_BYTES = RecordType::VALUE_LENGTH * io_uring_utils::BLOCK_ALIGN; +public: + uint64_t io_processing_time_us; + + explicit ValueWriterPostSort(int fd, int thread_idx, uint64_t values_per_chunk, + uint8_t* input_buffer, int run_idx, + KeyIndexPair* key_index_pairs + ): thread_idx(thread_idx), values_per_chunk(values_per_chunk), + run_idx(run_idx), input_buffer(input_buffer), key_index_pairs(key_index_pairs), + io_processing_time_us(0ll) { + this->fd = dup(fd); + write_bufs.resize(NUM_SLOTS); + for (uint32_t i=0; i= 0) { + close(fd); + } + for (uint32_t i = 0; i < write_bufs.size(); i++) { + free(write_bufs[i]); + } + } + + /** Fills write_bufs[buf_slot] with values for key_index_pairs[start_index .. start_index+num_values-1]. */ + void write_values_to_buf(uint32_t buf_slot, uint64_t start_index) { + const uint64_t num_values = WRITE_IO_BYTES / RecordType::VALUE_LENGTH; + uint8_t *buf = (uint8_t*)write_bufs[buf_slot]; + for (uint64_t i = 0; i < num_values; i++) { + KeyIndexPair &kv = key_index_pairs[start_index + i]; + const void *value_ptr = input_buffer + kv.value * sizeof(RecordType) + RecordType::KEY_LENGTH; + std::memcpy(buf, value_ptr, RecordType::VALUE_LENGTH); + kv.value = (uint64_t) run_idx; + buf += RecordType::VALUE_LENGTH; + } + } + + void run() { + spdlog::debug("Start writing out values post sort"); + ring = std::make_unique(NUM_SLOTS); + + std::queue slots; + for (uint32_t i=0; iprepare_write(fd, write_bufs[slot], WRITE_IO_BYTES, file_offset, slot); + to_submit++; + next_write++; + file_offset += WRITE_IO_BYTES; + } + + auto io_start = std::chrono::high_resolution_clock::now(); + bool should_submit = slots.empty() || to_submit >= BATCH_SIZE || (next_write == num_writes); + if (should_submit) { + int nr = (next_write == num_writes) ? static_cast(num_writes - completed) : slots.empty(); + ring->submit_and_wait(nr); + to_submit = 0; + + struct io_uring_cqe* cqe = nullptr; + while (ring->peek_cqe(&cqe)) { + auto p = process_cqe(cqe); + uint32_t slot_id = p.first; + if (p.second < 0) { + spdlog::error("write chunk failed: {}", p.second); + } + slots.push(slot_id); + completed++; + ring->mark_cqe_seen(cqe); + } + } + auto io_end = std::chrono::high_resolution_clock::now(); + io_processing_time_us += std::chrono::duration_cast(io_end - io_start).count(); + } + spdlog::debug("Done writing out values post sort"); + } +}; + +class AsyncValueReader { + enum BufState { + Empty, + IoCompleted, + WaitingForIO, + }; + + int fd; + uint64_t file_offset; // Offset within the file + uint64_t end_file_offset; + int reader_id; + + // TODO(): Maybe the number of buffers can be generalized?? + void *ptr[2]; + BufState states[2]; + uint64_t chunk_offset; // Offset within the in-memory buffer that is being used for reads + uint64_t read_chunk_size; + uint64_t value_length_bytes; // Size of individual values + uint64_t cur_buf_idx; + +public: + inline bool waiting_for_io() { + // spdlog::debug("cur_buf_idx of stream {}: {}", reader_id, cur_buf_idx); + return states[cur_buf_idx] != BufState::IoCompleted; + } + + /** logical_start_offset is the byte offset in the file where the logical stream starts. + * For O_DIRECT, the actual read offset is aligned down to BLOCK_ALIGN; the first buffer + * skips (logical_start_offset % BLOCK_ALIGN) bytes. */ + AsyncValueReader(int fd, uint64_t logical_start_offset, uint64_t value_length, uint64_t read_chunk_size, int reader_id): + fd(fd), value_length_bytes(value_length), reader_id(reader_id), read_chunk_size(read_chunk_size) { + uint64_t aligned_offset = (logical_start_offset / io_uring_utils::BLOCK_ALIGN) * io_uring_utils::BLOCK_ALIGN; + uint64_t initial_skip_bytes = logical_start_offset - aligned_offset; + file_offset = aligned_offset; + chunk_offset = initial_skip_bytes; + for (int i=0; i<2; i++) { + int ret = posix_memalign(&ptr[i], 4096, read_chunk_size); + assert(ret == 0); + memset(ptr[i], 0, read_chunk_size); + states[i] = BufState::Empty; + } + cur_buf_idx = 0; + } + + AsyncValueReader(const AsyncValueReader&) = delete; + AsyncValueReader& operator=(const AsyncValueReader&) = delete; + + ~AsyncValueReader() { + close(fd); + free(ptr[0]); + free(ptr[1]); + } + + inline void *get_next_value_fast() { + if (states[cur_buf_idx] != BufState::IoCompleted || chunk_offset >= read_chunk_size) { + return nullptr; + } + void *res = (uint8_t*) ptr[cur_buf_idx] + chunk_offset; + chunk_offset += value_length_bytes; + return res; + } + + inline bool need_submit() { + return states[cur_buf_idx] == BufState::IoCompleted || states[cur_buf_idx ^ 1] == BufState::Empty; + } + + inline void *get_next_value() { + if (states[cur_buf_idx] != BufState::IoCompleted) { + return nullptr; + } + if (chunk_offset >= read_chunk_size) { + states[cur_buf_idx] = BufState::Empty; + + cur_buf_idx = 1 ^ cur_buf_idx; + chunk_offset = 0ll; + if (states[cur_buf_idx] != BufState::IoCompleted) { + return nullptr; + } + } + void *res = (uint8_t*) ptr[cur_buf_idx] + chunk_offset; + chunk_offset += value_length_bytes; + return res; + } + + inline void process_io_completion(uint64_t user_data) { + int buf_idx = (int)(user_data & 1); + // spdlog::debug("buf_idx of completed IO: {}", buf_idx); + states[buf_idx] = IoCompleted; + } + + inline std::optional get_next_io() { + uint64_t next_buf_idx = (states[cur_buf_idx] == Empty) ? cur_buf_idx: cur_buf_idx ^ 1; + // uint64_t next_buf_idx = cur_buf_idx ^ 1; + if (states[next_buf_idx] == IoCompleted) { + return std::nullopt; + } + states[next_buf_idx] = WaitingForIO; + uint64_t user_data = (reader_id << 16) | next_buf_idx; + io_uring_utils::ReadTask task { + ptr[next_buf_idx], read_chunk_size, + fd, file_offset, user_data + }; + file_offset += read_chunk_size; + return task; + } +}; + + +template +class ValueWriterPostMerge { + using KeyIndexPair = KeyValuePair; + + static constexpr uint64_t WRITE_IO_BYTES = 120 * 1024; + + static constexpr uint64_t READ_IO_CHUNK = 120 * 1024; + + int out_fd; // Output file containing key-value pairs + + MergeTask *task; + + std::vector write_bufs; // Set of pre-allocated buffers used for writing out + + std::queue slots; + + std::unique_ptr ring; + + std::vector> readers; + + static constexpr uint64_t NUM_SLOTS = PREFETCH_DEPTH * 4; + + static constexpr uint32_t BATCH_SIZE = 1; +public: + uint64_t io_processing_time_us; + + explicit ValueWriterPostMerge(MergeTask *task, int out_fd, + std::vector &in_fds, std::vector &start_ptrs) + : task(task), io_processing_time_us(0ll) { + assert(in_fds.size() == start_ptrs.size()); + assert(in_fds.size() > 0 && "in_fds must not be empty"); + assert(WRITE_IO_BYTES % RecordType::VALUE_LENGTH == 0); + + this->out_fd = dup(out_fd); + + for (int i=0; istart_ptrs[i] - start_ptrs[i]) * RecordType::VALUE_LENGTH; + auto reader = std::make_unique(fd, logical_start, RecordType::VALUE_LENGTH, + READ_IO_CHUNK, i); + readers.push_back(std::move(reader)); + } + + write_bufs.resize(NUM_SLOTS); + for (uint32_t i=0; ipeek_cqe(&cqe)) { + ring->wait_cqe(&cqe); + } + + if ((cqe->user_data >> 32) < NUM_SLOTS) { + // spdlog::debug("Processing write completion"); + uint32_t slot = (cqe->user_data >> 32); + slots.push(slot); + } else { + int reader_idx = (cqe->user_data >> 16) & 0xffff; + // spdlog::debug("Reader idx: {}", reader_idx); + readers[reader_idx]->process_io_completion(cqe->user_data); + auto task = readers[reader_idx]->get_next_io(); + if (task.has_value()) { + task.value().user_data |= (NUM_SLOTS << 32); + ring->prepare_read(task.value()); + } + } + ring->mark_cqe_seen(cqe); + } + + void run() { + spdlog::debug("Start post-merge ops"); + uint32_t ring_num_entries = NUM_SLOTS + readers.size(); + ring = std::make_unique(ring_num_entries); + + for (uint32_t i=0; ioutput; + + for (auto &reader: readers) { + auto task = reader->get_next_io(); + if (task.has_value()) { + task.value().user_data |= (NUM_SLOTS << 32); + ring->prepare_read(task.value()); + } + } + ring->submit_and_wait(0); + uint64_t out_file_offset = 0ll; + + while (records_emitted < task->total_records_sorted) { + while (slots.empty()) { + auto io_start = std::chrono::high_resolution_clock::now(); + poll_completions(); + auto io_end = std::chrono::high_resolution_clock::now(); + io_processing_time_us += std::chrono::duration_cast(io_end - io_start).count(); + } + uint32_t slot = slots.front(); + slots.pop(); + uint64_t num_records_in_batch = std::min(task->total_records_sorted - records_emitted, WRITE_IO_BYTES / sizeof(RecordType)); + RecordType *out_buf = reinterpret_cast(write_bufs[slot]); + uint64_t offset = 0; + + for (uint64_t i=0; ikey); + uint32_t stream_id = sorted_keys->value; + + assert(stream_id < readers.size()); + assert(readers[stream_id] != nullptr); + + void *value = readers[stream_id]->get_next_value_fast(); + if (value == nullptr) [[unlikely]] { + auto io_start = std::chrono::high_resolution_clock::now(); + // Having another call here just so that we can submit an io pre-emptively. This is necessary when switching from + // one buffer to the next. If the IO to the next buffer is complete, it won't refill the earlier buffer. + value = readers[stream_id]->get_next_value(); + if (readers[stream_id]->need_submit()) { + auto task = readers[stream_id]->get_next_io(); + assert(task.has_value() && "Task is null"); + task.value().user_data |= (NUM_SLOTS << 32); + ring->prepare_read(task.value()); + } + + while (readers[stream_id]->waiting_for_io()) { + poll_completions(); + } + ring->submit_and_wait(0); + auto io_end = std::chrono::high_resolution_clock::now(); + io_processing_time_us += std::chrono::duration_cast(io_end - io_start).count(); + value = readers[stream_id]->get_next_value(); + } + assert(value != nullptr && "Value is null"); + std::memcpy(&out_buf[i].value, value, RecordType::VALUE_LENGTH); + sorted_keys++; + } + auto io_start = std::chrono::high_resolution_clock::now(); + ring->prepare_write(out_fd, write_bufs[slot], num_records_in_batch * sizeof(RecordType), + out_file_offset, (uint64_t)slot << 32); + ring->submit_and_wait(0); + + auto io_end = std::chrono::high_resolution_clock::now(); + io_processing_time_us += std::chrono::duration_cast(io_end - io_start).count(); + out_file_offset += num_records_in_batch * sizeof(RecordType); + records_emitted += num_records_in_batch; + } + spdlog::debug("End post-merge ops"); + } +}; diff --git a/src/config.h b/src/config.h index 57f77ea..a8770d5 100644 --- a/src/config.h +++ b/src/config.h @@ -25,6 +25,9 @@ struct Config { bool use_std_sort; + /** If true, use io_uring for phase 1+2 (read + extract keys). Requires liburing. */ + bool use_async{false}; + inline uint64_t num_runs() { return (file_size_bytes + run_size_bytes - 1) / run_size_bytes; } diff --git a/src/io_uring_utils.h b/src/io_uring_utils.h new file mode 100644 index 0000000..a3850d4 --- /dev/null +++ b/src/io_uring_utils.h @@ -0,0 +1,128 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace io_uring_utils { + +constexpr uint32_t BLOCK_ALIGN = 4096; + +constexpr bool POLL = false; + +struct ReadTask { + void *buf; + uint64_t bytes; + int fd; + uint64_t offset; + uint64_t user_data; +}; + +/** + * Per-thread io_uring context + */ +struct UringRing { + struct io_uring ring_; + uint32_t queue_depth_; + uint32_t pending_; // number of submitted but not yet reaped + + explicit UringRing(uint32_t queue_depth) + : queue_depth_(queue_depth), pending_(0) { + unsigned int flags = 0; + flags |= IORING_SETUP_DEFER_TASKRUN | IORING_SETUP_SINGLE_ISSUER; + if (POLL) { + flags |= IORING_SETUP_IOPOLL; + } + + int ret = io_uring_queue_init(queue_depth, &ring_, flags); + assert(ret >= 0 && "io_uring_queue_init failed"); + } + + ~UringRing() { + io_uring_queue_exit(&ring_); + } + + UringRing(const UringRing&) = delete; + UringRing& operator=(const UringRing&) = delete; + + struct io_uring* get() { return &ring_; } + + bool prepare_read(int fd, void* buf, uint32_t len, uint64_t file_offset, uint64_t user_data) { + struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_); + if (!sqe) return false; + io_uring_prep_read(sqe, fd, buf, len, file_offset); + io_uring_sqe_set_data(sqe, reinterpret_cast(static_cast(user_data))); + io_uring_sqe_set_flags(sqe, 0); + pending_++; + return true; + } + + bool prepare_read(ReadTask &task) { + struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_); + if (!sqe) return false; + io_uring_prep_read(sqe, task.fd, task.buf, task.bytes, task.offset); + io_uring_sqe_set_data(sqe, reinterpret_cast(static_cast(task.user_data))); + io_uring_sqe_set_flags(sqe, 0); + pending_++; + return true; + } + + bool prepare_write(int fd, void *buf, uint32_t len, uint64_t file_offset, uint64_t user_data) { + struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_); + if (!sqe) return false; + io_uring_prep_write(sqe, fd, buf, len, file_offset); + io_uring_sqe_set_data(sqe, reinterpret_cast(static_cast(user_data))); + io_uring_sqe_set_flags(sqe, 0); + pending_++; + return true; + } + + /** Submit previously prepared SQEs to the kernel. */ + int submit_and_wait(int wait_nr) { + int n = io_uring_submit_and_wait(&ring_, wait_nr); + return n; + } + + bool wait_cqe(struct io_uring_cqe** cqe_ptr) { + int ret = io_uring_wait_cqe(&ring_, cqe_ptr); + if (ret < 0) return false; + return true; + } + + /** + * Peek at a completion without removing it. Returns true if one is available. + */ + bool peek_cqe(struct io_uring_cqe** cqe_ptr) { + return io_uring_peek_cqe(&ring_, cqe_ptr) == 0; + } + + void mark_cqe_seen(struct io_uring_cqe* cqe) { + io_uring_cqe_seen(&ring_, cqe); + if (pending_ > 0) pending_--; + } + + static uint64_t cqe_user_data(struct io_uring_cqe* cqe) { + return static_cast(reinterpret_cast(io_uring_cqe_get_data(cqe))); + } + + static int cqe_result(struct io_uring_cqe* cqe) { + return cqe->res; + } +}; + +/** + * Allocate a buffer suitable for O_DIRECT and io_uring reads. + * Caller must free with aligned_free. + */ +inline void* aligned_alloc_read_buffer(uint64_t size) { + assert(size > 0 && (size % BLOCK_ALIGN) == 0); + void* p = nullptr; + int ret = posix_memalign(&p, BLOCK_ALIGN, size); + assert(ret == 0); + return p; +} + +} // namespace io_uring_utils diff --git a/src/main.cpp b/src/main.cpp index c037e8d..8a922e6 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -21,6 +21,7 @@ struct ParsedArgs { uint32_t num_threads; bool separate_values; bool use_std_sort; + bool use_async; }; size_t parseSizeString(const std::string& sizeStr) { @@ -105,6 +106,7 @@ void printHelp(const char* program_name) { std::cout << " --memory-size Memory size with unit (default: 100M)\n"; std::cout << " --read-chunk-size Read chunk size with unit (default: 100M)\n"; std::cout << " --num-threads Number of threads for parallel sorting (default: 1)\n"; + std::cout << " --use-async Use io_uring\n"; std::cout << " --help, -h Show this help message\n"; std::cout << "\nSize units: B (bytes), K (KB), M (MB), G (GB)\n"; std::cout << "Examples: 1M, 512K, 2G, 1024B\n"; @@ -119,6 +121,7 @@ int parseArguments(int argc, char* argv[], ParsedArgs& args) { args.memory_size = parseSizeString("100M"); // 100MB default args.num_threads = 1; // 1 thread default args.separate_values = false; + args.use_async = false; // Parse command line arguments for (int i = 1; i < argc; i++) { @@ -153,6 +156,9 @@ int parseArguments(int argc, char* argv[], ParsedArgs& args) { else if (arg == "--use-std-sort") { args.use_std_sort = true; } + else if (arg == "--use-async") { + args.use_async = true; + } else { std::cerr << "Unknown argument: " << arg << std::endl; std::cerr << "Use --help for usage information." << std::endl; @@ -178,7 +184,7 @@ void pin_current_thread() { } int main(int argc, char* argv[]) { - spdlog::set_level(spdlog::level::info); + spdlog::set_level(spdlog::level::debug); // pin_current_thread(); ParsedArgs args; int parse_result = parseArguments(argc, argv, args); @@ -200,6 +206,7 @@ int main(int argc, char* argv[]) { config.intermediate_file_prefix = args.working_dir + "/intermediate-" + file_size_str; config.separate_values = args.separate_values; config.use_std_sort = args.use_std_sort; + config.use_async = args.use_async; if (args.key_size == 8 && args.value_size == 8) { Sorter> sorter(std::move(config)); diff --git a/src/merge.h b/src/merge.h index d342645..4f3cbf9 100644 --- a/src/merge.h +++ b/src/merge.h @@ -52,9 +52,9 @@ std::vector> create_tasks(std::vector void run_merge_avx_512(MergeTask *task, bool *result_sorted) { using Regtype = avx512; - using ItemType = RecordType; - static_assert(std::is_same_v); - static_assert(std::is_same_v); + using ItemType = KeyValue; + // static_assert(std::is_same_v); + // static_assert(std::is_same_v); int num_streams = task->start_ptrs.size(); std::vector end_ptrs; diff --git a/src/sorted_run.h b/src/sorted_run.h index e83ec75..1af016b 100644 --- a/src/sorted_run.h +++ b/src/sorted_run.h @@ -16,11 +16,6 @@ struct SortedRun { uint64_t num_elements; }; -enum ReaderState { - WaitingForIO, - Ready, -}; - template class SortedRunReader { static constexpr uint32_t ELEM_SIZE = sizeof(RecordType); @@ -49,7 +44,6 @@ class SortedRunReader { // uint32_t next_chunk; - // ReaderState state; public: // IoTask* get_next_io_task() { diff --git a/src/sorter.h b/src/sorter.h index 2222c13..3bb6723 100644 --- a/src/sorter.h +++ b/src/sorter.h @@ -22,12 +22,15 @@ #include +#include "io_uring_utils.h" #include "perf_utils.h" #include "key_value_pair.h" #include "sorted_run.h" #include "config.h" #include "merge.h" #include "read_values.h" +#include "async_stages.h" + #define _REENTRANT #include "ips4o.hpp" @@ -48,13 +51,14 @@ struct TimingInfo { float value_separation; float value_write_back; float value_write_back_post_merge; - float final_output_write; float create_intermediate_value_runs; float merge_in_memory; + float async_read_and_extract_keys; + float async_value_writer_post_sort; }; -void write_to_disk(void *buf, uint64_t file_offset, uint64_t bytes, int fd) { +inline void write_to_disk(void *buf, uint64_t file_offset, uint64_t bytes, int fd) { uint64_t offset = 0ll; while (bytes > 0) { uint64_t bytes_to_write = std::min(bytes, MAX_IO_CHUNK_SIZE); @@ -99,14 +103,13 @@ class Sorter { void in_place_std_sort(std::vector> &vec); void generate_key_index_pairs(std::vector &vec, - std::vector &key_index_pairs, - void* value_buffer, bool separate_values); + std::vector &key_index_pairs); void write_back_values(std::vector &original, std::vector &key_index_pairs, - void *value_buffer); + std::vector &result); - void sort_single_run(std::vector &run); + void sort_single_run(std::vector &run, std::vector &output); void generate_run_for_merge_sort(std::vector &run, void *sorted_values, std::vector &key_index_pairs, int run_id); @@ -114,6 +117,9 @@ class Sorter { void write_back_values_post_merge(std::vector &fds, std::vector> &tasks, std::vector> &key_index_pairs); + void write_back_values_post_merge_async(std::vector &fds, std::vector> &tasks, + std::vector> &key_index_pairs); + public: Sorter(Config &&config) { this->config = std::move(config); @@ -124,24 +130,35 @@ class Sorter { void sort(); - void print_timing_stats() { + void print_timing_stats_sync() { spdlog::info("Read input: {} ms", timing_info.input_read); - spdlog::info("Read merge phase: {} ms", timing_info.merge_read); - spdlog::info("Write sorted runs to disk: {} ms", timing_info.intermediate_write); - spdlog::info("Write output to disk: {} ms", timing_info.output_write); - spdlog::info("Sorting time: {} ms", timing_info.sort_time); spdlog::info("Key-value separation: {} ms", timing_info.value_separation); + spdlog::info("Sorting time: {} ms", timing_info.sort_time); spdlog::info("Value write back (for one-pass sort): {} ms", timing_info.value_write_back); - spdlog::info("Creation of intermediate value runs: {} ms", timing_info.create_intermediate_value_runs); + spdlog::info("Creation of intermediate value runs (for merge sort): {} ms", timing_info.create_intermediate_value_runs); + spdlog::info("Write sorted runs to disk: {} ms", timing_info.intermediate_write); + spdlog::info("Read merge phase: {} ms", timing_info.merge_read); spdlog::info("In-memory merge: {} ms", timing_info.merge_in_memory); + spdlog::info("Write output to disk: {} ms", timing_info.output_write); spdlog::info("Write back values (after merge sort): {} ms", timing_info.value_write_back_post_merge); - spdlog::info("Write final output to disk (after merge sort): {} ms", timing_info.final_output_write); + } + + void print_timing_stats() { + if (config.use_async) { + spdlog::info("Read and extract keys: {} ms", timing_info.async_read_and_extract_keys); + spdlog::info("Sorting time: {} ms", timing_info.sort_time); + spdlog::info("Accumulate values and write intermediate runs to disk: {} ms", timing_info.async_value_writer_post_sort); + spdlog::info("In-memory merge: {} ms", timing_info.merge_in_memory); + spdlog::info("Write back values (after merge sort): {} ms", timing_info.value_write_back_post_merge); + } else { + print_timing_stats_sync(); + } } }; template void Sorter::generate_key_index_pairs(std::vector &vec, - std::vector &key_index_pairs, void* value_buffer, bool separate_values) { + std::vector &key_index_pairs) { spdlog::debug("Start separating values from keys"); auto start = std::chrono::high_resolution_clock::now(); #pragma omp parallel for num_threads(config.num_threads) @@ -149,10 +166,6 @@ void Sorter::generate_key_index_pairs(std::vector &vec, key_index_pairs[i].key = __builtin_bswap64(vec[i].key); key_index_pairs[i].set_value(&i); uint64_t offset = i * RecordType::VALUE_LENGTH; - - if (separate_values) { - std::memcpy((uint8_t*)value_buffer + offset, &vec[i].value, RecordType::VALUE_LENGTH); - } } auto end = std::chrono::high_resolution_clock::now(); timing_info.value_separation += std::chrono::duration_cast(end - start).count() / 1000.0f; @@ -163,18 +176,18 @@ void Sorter::generate_key_index_pairs(std::vector &vec, template void Sorter::write_back_values(std::vector &original, std::vector &key_index_pairs, - void *value_buffer) { + std::vector &result) { int miss_fd = init_perf_counter(PERF_COUNT_HW_CACHE_MISSES); int ref_fd = init_perf_counter(PERF_COUNT_HW_CACHE_REFERENCES); spdlog::debug("Start writing back values"); auto start = std::chrono::high_resolution_clock::now(); - // TODO(): Maybe unroll loop?? + #pragma omp parallel for num_threads(config.num_threads) for (uint64_t i=0; i(end - start).count() / 1000.0f; @@ -328,20 +341,15 @@ void Sorter::sort_independently(std::vector -void Sorter::sort_single_run(std::vector &run) { +void Sorter::sort_single_run(std::vector &run, std::vector &output) { if (config.separate_values) { std::vector key_index_pairs(run.size()); - void *value_buffer; - int ret = posix_memalign(&value_buffer, 64, RecordType::VALUE_LENGTH * run.size()); - assert(ret == 0); - memset(value_buffer, 0, RecordType::VALUE_LENGTH * run.size()); - generate_key_index_pairs(run, key_index_pairs, value_buffer, true); + generate_key_index_pairs(run, key_index_pairs); in_place_sort(key_index_pairs); - write_back_values(run, key_index_pairs, value_buffer); + write_back_values(run, key_index_pairs, output); - spdlog::info("is sorted: {}", std::is_sorted(run.begin(), run.end())); - free(value_buffer); + spdlog::info("is sorted: {}", std::is_sorted(output.begin(), output.end())); } else { in_place_sort(run); } @@ -356,13 +364,13 @@ void Sorter::generate_run_for_merge_sort( key_index_pairs.resize(run.size()); memset(key_index_pairs.data(), 0, run.size() * sizeof(KeyIndexPair)); - generate_key_index_pairs(run, key_index_pairs, nullptr, false); + generate_key_index_pairs(run, key_index_pairs); in_place_sort(key_index_pairs); auto start = std::chrono::high_resolution_clock::now(); #pragma omp parallel for num_threads(config.num_threads) for (int i=0; i::sort() { uint64_t num_runs = config.num_runs(); if (num_runs == 1) { auto v = read_input_chunk(0); - sort_single_run(v); - write_output_chunk(v.data(), config.file_size_bytes); + std::vector output(v.size()); + memset((void*) output.data(), 0, v.size() * ELEM_SIZE); + sort_single_run(v, output); + write_output_chunk(output.data(), config.file_size_bytes); return; } @@ -397,21 +407,84 @@ void Sorter::sort() { std::vector> key_index_pairs(num_runs); std::vector fds; + std::vector v; + v.resize(config.run_size_bytes / ELEM_SIZE); + memset(v.data(), 0, config.run_size_bytes); + void *sorted_values; + int ret = posix_memalign(&sorted_values, 4096, RecordType::VALUE_LENGTH * v.size()); + assert(ret == 0); + memset(sorted_values, 0, RecordType::VALUE_LENGTH * v.size()); + for (int i=0; i( + read_fd, j, config.run_size_bytes / config.num_threads, + record_ptr, key_index_ptr); + } + auto end_read_extract = std::chrono::high_resolution_clock::now(); + timing_info.async_read_and_extract_keys += std::chrono::duration_cast(end_read_extract - start_read_extract).count() / 1000.0f; + in_place_sort(key_index_pairs[i]); + + std::string file_name = config.intermediate_file_prefix + "-chunk-" + std::to_string(i); + int write_fd = open(file_name.c_str(), O_CREAT | O_RDWR | O_TRUNC | O_DIRECT, 0644); + assert(write_fd != -1); + + posix_fallocate64(write_fd, 0, RecordType::VALUE_LENGTH * v.size()); + + uint64_t values_per_chunk = v.size() / config.num_threads; + assert(values_per_chunk % io_uring_utils::BLOCK_ALIGN == 0); + std::vector>> writers; + + for (int j = 0; j < config.num_threads; j++) { + KeyIndexPair* key_index_buf = key_index_pairs[i].data() + j * values_per_chunk; + auto writer = std::make_unique>( + write_fd, j, values_per_chunk, reinterpret_cast(v.data()), i, key_index_buf + ); + writers.push_back(std::move(writer)); + } + + auto start_writers = std::chrono::high_resolution_clock::now(); + #pragma omp parallel for num_threads(config.num_threads) + for (int j=0; jrun(); + } + auto end_writers = std::chrono::high_resolution_clock::now(); + timing_info.async_value_writer_post_sort += std::chrono::duration_cast(end_writers - start_writers).count() / 1000.0f; + + auto total_io_processing_time = 0ll; + for (int j=0; jio_processing_time_us; + } + + spdlog::info("Average IO processing time in post-sort step: {} ms", total_io_processing_time / (1000.0f * config.num_threads)); + fds.push_back(write_fd); + spdlog::debug("Done async read and key extraction"); + } } + free(sorted_values); std::vector> tasks; merge(key_index_pairs, config.run_size_bytes / sizeof(RecordType), &tasks); - write_back_values_post_merge(fds, tasks, key_index_pairs); + if (config.use_async) { + write_back_values_post_merge_async(fds, tasks, key_index_pairs); + } else { + write_back_values_post_merge(fds, tasks, key_index_pairs); + } } template @@ -555,7 +628,7 @@ void Sorter::write_back_values_post_merge(std::vector &fds, close(fd); } auto write_end = std::chrono::high_resolution_clock::now(); - timing_info.final_output_write += std::chrono::duration_cast(write_end - write_start).count() / 1000.0f; + timing_info.output_write += std::chrono::duration_cast(write_end - write_start).count() / 1000.0f; for (size_t i = 0; i < output_ptrs.size(); i++) { free(output_ptrs[i]); @@ -563,3 +636,39 @@ void Sorter::write_back_values_post_merge(std::vector &fds, spdlog::debug("Done writing back values post merge"); } + +template +void Sorter::write_back_values_post_merge_async(std::vector &fds, + std::vector> &tasks, + std::vector> &key_index_pairs) { + std::vector start_ptrs; + for (auto &v: key_index_pairs) { + start_ptrs.push_back(v.data()); + } + + std::vector>> writers; + for (int i=0; i>(&tasks[i], out_fd, fds, start_ptrs); + writers.push_back(std::move(writer)); + close(out_fd); + } + + auto start = std::chrono::high_resolution_clock::now(); + #pragma omp parallel for num_threads(config.num_threads) + for (int i=0; irun(); + } + + auto total_io_processing_time = 0ll; + for (int i=0; iio_processing_time_us; + } + auto end = std::chrono::high_resolution_clock::now(); + spdlog::info("Average IO processing time in post-merge step: {} ms", total_io_processing_time / (1000.0f * config.num_threads)); + timing_info.value_write_back_post_merge = std::chrono::duration_cast(end - start).count() / 1000.0f; +} \ No newline at end of file diff --git a/tools/plot_bpftrace_bw.py b/tools/plot_bpftrace_bw.py new file mode 100644 index 0000000..09bb453 --- /dev/null +++ b/tools/plot_bpftrace_bw.py @@ -0,0 +1,151 @@ +import argparse +import sys +import re +import matplotlib.pyplot as plt + +""" +Parse bpftrace output and plot read/write bandwidth over time. + +bpftrace output format: +.: Read: MB/s, Write: MB/s +Example: 1.234: Read: 100 MB/s, Write: 50 MB/s +""" + +def parse_bpftrace(file): + """ + Parse bpftrace bandwidth output. + + Args: + file: File object or stdin + + Returns: + Tuple of (times, read_bandwidths, write_bandwidths) + times are in seconds, bandwidths are in MB/s + """ + times = [] + read_bandwidths = [] + write_bandwidths = [] + + # Pattern to match: "1.234: Read: 100 MB/s, Write: 50 MB/s" + # bpftrace uses %d.%03d format, so milliseconds are always 3 digits + pattern = r'(\d+)\.(\d{3}):\s+Read:\s+(\d+)\s+MB/s,\s+Write:\s+(\d+)\s+MB/s' + + for line in file: + line = line.strip() + if not line or line.startswith("Tracing complete"): + continue + + match = re.match(pattern, line) + if match: + time_sec = int(match.group(1)) + time_ms = int(match.group(2)) + read_bw = int(match.group(3)) + write_bw = int(match.group(4)) + + # Convert to seconds (e.g., 1.234 -> 1.234 seconds) + time_total = time_sec + time_ms / 1000.0 + + times.append(time_total) + read_bandwidths.append(read_bw) + write_bandwidths.append(write_bw) + + return times, read_bandwidths, write_bandwidths + +def plot_bandwidth(times, read_bandwidths, write_bandwidths, output_file=None, input_file=None, start_time=None, end_time=None): + """ + Plot read and write bandwidth over time. + + Args: + times: List of timestamps (in seconds) + read_bandwidths: List of read bandwidths (in MB/s) + write_bandwidths: List of write bandwidths (in MB/s) + output_file: Output file path for the plot (if None, generates default name) + input_file: Input file path (for default filename generation) + start_time: Start time in seconds to clip the plot (None = no clipping) + end_time: End time in seconds to clip the plot (None = no clipping) + """ + if not times: + print("No data to plot.") + return + + # Filter data based on start_time and end_time + filtered_times = [] + filtered_read = [] + filtered_write = [] + + for i, t in enumerate(times): + if start_time is not None and t < start_time: + continue + if end_time is not None and t > end_time: + continue + filtered_times.append(t) + filtered_read.append(read_bandwidths[i]) + filtered_write.append(write_bandwidths[i]) + + if not filtered_times: + print("No data to plot after clipping.") + return + + # Calculate total bandwidth (read + write) + filtered_total = [r + w for r, w in zip(filtered_read, filtered_write)] + + plt.figure(figsize=(12, 6)) + + # Convert times to relative time (milliseconds from start) + plot_start_time = filtered_times[0] + relative_times = [(t - plot_start_time) * 1000 for t in filtered_times] # Convert to ms + + # Plot total bandwidth first with dashed line and lower z-order so read/write are on top + plt.plot(relative_times, filtered_total, label='Total', color='green', linewidth=1, + linestyle='--', alpha=0.7, zorder=1) + # Plot read and write on top with higher z-order + plt.plot(relative_times, filtered_read, label='Read', color='blue', linewidth=1, zorder=2) + plt.plot(relative_times, filtered_write, label='Write', color='red', linewidth=1, zorder=2) + + plt.xlabel('Time (ms)', fontsize=12) + plt.ylabel('Bandwidth (MB/s)', fontsize=12) + plt.title('Disk I/O Bandwidth Over Time (from bpftrace)', fontsize=14) + plt.legend(fontsize=11) + plt.grid(True, alpha=0.3) + + plt.tight_layout() + + # Generate default filename if not provided + if output_file is None: + if input_file: + base_name = input_file.rsplit('.', 1)[0] if '.' in input_file else input_file + output_file = f"{base_name}_bandwidth.png" + else: + output_file = "bpftrace_bandwidth.png" + + plt.savefig(output_file, dpi=300, bbox_inches='tight') + print(f"Plot saved to {output_file}") + plt.close() + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Parse bpftrace bandwidth output and plot read/write bandwidth over time.") + parser.add_argument("--file", type=str, help="Path to bpftrace output file (default: stdin)") + parser.add_argument("--output", type=str, help="Output file path for the plot (default: auto-generated based on input file)") + parser.add_argument("--start", type=float, help="Start time in seconds to clip the plot (default: no clipping)") + parser.add_argument("--end", type=float, help="End time in seconds to clip the plot (default: no clipping)") + args = parser.parse_args() + + if args.file: + with open(args.file, "r") as f: + times, read_bandwidths, write_bandwidths = parse_bpftrace(f) + else: + times, read_bandwidths, write_bandwidths = parse_bpftrace(sys.stdin) + + if times: + original_count = len(times) + plot_bandwidth(times, read_bandwidths, write_bandwidths, args.output, args.file, args.start, args.end) + if args.start is not None or args.end is not None: + # Count filtered points + filtered_count = sum(1 for t in times + if (args.start is None or t >= args.start) and + (args.end is None or t <= args.end)) + print(f"Processed {original_count} data points ({filtered_count} after clipping)") + else: + print(f"Processed {original_count} data points") + else: + print("No bandwidth data found in input.") diff --git a/tools/plot_disk_bw.py b/tools/plot_disk_bw.py new file mode 100644 index 0000000..a4facf3 --- /dev/null +++ b/tools/plot_disk_bw.py @@ -0,0 +1,199 @@ +import argparse +import sys +import numpy as np +import matplotlib.pyplot as plt +from collections import defaultdict + +""" +Biosnoop output is in the following format: +TIME(s) COMM PID DISK T SECTOR BYTES QUE(ms) LAT(ms) +""" +def parse_biosnoop(file, pid): + latencies = [] + io_sizes = [] + trace_data = [] # Store trace data: (sector, io_size, queued_time, latency) + io_events = [] # Store IO events with time: (time_s, io_type, bytes) + + for line in file: + line = line.strip() + if not line or line.startswith("TIME(") or line.startswith("--"): + continue + parts = line.split() + if len(parts) < 8: + continue + try: + time_s = float(parts[0]) # TIME column (in seconds) + line_pid = int(parts[2]) + comm = parts[1] + io_type = parts[4] # T column: R for read, W for write + que_ms = float(parts[-2]) # QUE column + lat_ms = float(parts[-1]) # LAT column + io_size = int(parts[-3]) # BYTES column + # Extract sector number - typically in the 6th column + sector = int(parts[5]) + except (ValueError, IndexError): + continue + if line_pid == pid: + if timing_type == "queued": + latencies.append(que_ms) + elif timing_type == "block_device": + latencies.append(lat_ms) + elif timing_type == "total": + latencies.append(lat_ms + que_ms) + io_sizes.append(io_size) + # Store trace data: sector, io_size, queued_time, latency + trace_data.append((sector, io_size, que_ms, lat_ms)) + # Store IO event with time and type for bandwidth calculation + io_events.append((time_s, io_type, io_size)) + return latencies, io_sizes, trace_data, io_events + +def print_percentiles(latencies): + if not latencies: + print(f"No I/O events found.") + return + percentiles = [50, 70, 90, 99] + values = np.percentile(latencies, percentiles) + + for p, v in zip(percentiles, values): + print(f" p{p}: {v:.3f}") + print(f"Mean: {np.mean(latencies)}") + print(f" count: {len(latencies)} events") + +def calculate_bandwidth(io_events, interval_ms): + """ + Calculate read and write bandwidth for each time interval. + + Args: + io_events: List of tuples (time_s, io_type, bytes) + interval_ms: Time interval in milliseconds + + Returns: + Tuple of (time_intervals, read_bandwidths, write_bandwidths) + Bandwidths are in MB/s + """ + if not io_events: + return [], [], [] + + # Convert interval from ms to seconds + interval_s = interval_ms / 1000.0 + + # Find time range + times = [event[0] for event in io_events] + min_time = min(times) + max_time = max(times) + + # Create time bins + num_intervals = int(np.ceil((max_time - min_time) / interval_s)) + time_intervals = [] + read_bytes = defaultdict(float) + write_bytes = defaultdict(float) + + # Group IO events by time interval + for time_s, io_type, bytes_count in io_events: + # Calculate which interval this event belongs to + interval_idx = int((time_s - min_time) / interval_s) + interval_start = min_time + interval_idx * interval_s + + if interval_start not in time_intervals: + time_intervals.append(interval_start) + + # Accumulate bytes by type + if io_type.upper() == 'R': + read_bytes[interval_start] += bytes_count + elif io_type.upper() == 'W': + write_bytes[interval_start] += bytes_count + + # Sort time intervals + time_intervals = sorted(set(time_intervals)) + + # Calculate bandwidth in MB/s for each interval + read_bandwidths = [] + write_bandwidths = [] + + for interval_start in time_intervals: + read_bw = (read_bytes[interval_start] / (1024 * 1024)) / interval_s # MB/s + write_bw = (write_bytes[interval_start] / (1024 * 1024)) / interval_s # MB/s + read_bandwidths.append(read_bw) + write_bandwidths.append(write_bw) + + return time_intervals, read_bandwidths, write_bandwidths + +def plot_bandwidth(time_intervals, read_bandwidths, write_bandwidths, interval_ms, output_file=None, pid=None, input_file=None): + """ + Plot read and write bandwidth over time. + + Args: + time_intervals: List of interval start times (in seconds) + read_bandwidths: List of read bandwidths (in MB/s) + write_bandwidths: List of write bandwidths (in MB/s) + interval_ms: Time interval in milliseconds (for title) + output_file: Output file path for the plot (if None, generates default name) + pid: Process ID (for default filename generation) + input_file: Input file path (for default filename generation) + """ + if not time_intervals: + print("No data to plot.") + return + + plt.figure(figsize=(12, 6)) + + # Convert time intervals to relative time (seconds from start) + start_time = time_intervals[0] + relative_times = [(t - start_time) * 1000 for t in time_intervals] # Convert to ms + + plt.plot(relative_times, read_bandwidths, label='Read', color='blue', linewidth=2) + plt.plot(relative_times, write_bandwidths, label='Write', color='red', linewidth=2) + + plt.xlabel('Time (ms)', fontsize=12) + plt.ylabel('Bandwidth (MB/s)', fontsize=12) + plt.title(f'Disk I/O Bandwidth Over Time (Interval: {interval_ms} ms)', fontsize=14) + plt.legend(fontsize=11) + plt.grid(True, alpha=0.3) + + plt.tight_layout() + + # Generate default filename if not provided + if output_file is None: + if input_file: + base_name = input_file.rsplit('.', 1)[0] if '.' in input_file else input_file + output_file = f"{base_name}_bandwidth_pid{pid}_interval{interval_ms}ms.png" + else: + output_file = f"bandwidth_pid{pid}_interval{interval_ms}ms.png" + + plt.savefig(output_file, dpi=300, bbox_inches='tight') + print(f"Plot saved to {output_file}") + plt.close() + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Parse biosnoop-bpfcc output and plot bandwidth over time.") + parser.add_argument("--pid", type=int, required=True, help="PID to filter") + parser.add_argument("--file", type=str, help="Path to biosnoop output file (default: stdin)") + parser.add_argument("--interval", type=float, required=True, help="Time interval in milliseconds for bandwidth calculation") + parser.add_argument("--output", type=str, help="Output file path for the plot (default: auto-generated based on input file and PID)") + parser.add_argument("--timing-type", type=str, default="total", choices=["queued", "block_device", "total"], + help="Type of latency to calculate (default: total)") + args = parser.parse_args() + + global timing_type + timing_type = args.timing_type + + if args.file: + with open(args.file, "r") as f: + latencies, io_sizes, trace_data, io_events = parse_biosnoop(f, args.pid) + else: + latencies, io_sizes, trace_data, io_events = parse_biosnoop(sys.stdin, args.pid) + + # Calculate and plot bandwidth + if io_events: + time_intervals, read_bandwidths, write_bandwidths = calculate_bandwidth(io_events, args.interval) + plot_bandwidth(time_intervals, read_bandwidths, write_bandwidths, args.interval, args.output, args.pid, args.file) + else: + print("No I/O events found for plotting.") + + # Print latency statistics if available + if latencies: + print(f"\n{args.timing_type.replace('_', ' ').title()} latency percentiles (ms):") + print_percentiles(latencies) + + print(f"\nIO sizes percentiles:") + print_percentiles(io_sizes) \ No newline at end of file