Integrate async IO#2
Conversation
|
/gemini review |
Summary of ChangesHello @proteetpaul, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request refactors the merge sort implementation to leverage Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request integrates asynchronous I/O using io_uring to fuse several stages of the merge sort process, aiming to improve performance by overlapping I/O and CPU work. The changes are significant and introduce new asynchronous workflows for reading, key extraction, and writing data.
My review has identified several critical correctness and memory management issues in the new asynchronous code paths, particularly concerning pointer arithmetic, loop indexing, memory leaks, and use-after-free bugs. There are also some areas where the implementation could be improved for clarity and to complete the parallelization effort. Addressing these critical issues is essential for the feature to function correctly.
There was a problem hiding this comment.
Code Review
This pull request introduces asynchronous I/O using io_uring to improve performance, which is a great initiative. However, the current implementation has several critical correctness and performance issues that need to be addressed before merging. I've identified problems in memory management, loop control, data handling in async operations, and incorrect use of C++ features like Variable Length Arrays. My review includes detailed comments and suggestions to fix these issues. Please review them carefully.
| struct io_uring_cqe* cqe = nullptr; | ||
| if (!ring->wait_cqe(&cqe)) { | ||
| spdlog::error("wait_cqe failed"); | ||
| break; | ||
| } | ||
| auto p = process_cqe(cqe); | ||
| uint32_t slot_id = (uint32_t) (p.first & 0xffff); | ||
| slots.push(slot_id); | ||
| int result = p.second; | ||
| } |
There was a problem hiding this comment.
This block has several critical issues:
- The
completedcounter is never incremented inside thewhileloop, which will cause an infinite loop ifnum_writes > 0. ring->mark_cqe_seen(cqe)is never called after processing a completion, which will causeio_uringto stall.- The
slot_idis extracted incorrectly usingp.first & 0xffff. It should bep.first >> 32based on howuser_datais packed.
These issues will prevent the run() method from working correctly.
struct io_uring_cqe* cqe = nullptr;
while (ring->peek_cqe(&cqe)) {
auto p = process_cqe(cqe);
uint32_t slot_id = p.first >> 32;
uint32_t write_id = p.first & 0xFFFFFFFF;
if (p.second < 0) {
spdlog::error("write chunk {} failed: {}", write_id, p.second);
}
slots.push(slot_id);
completed++;
ring->mark_cqe_seen(cqe);
}
if (!ring->wait_cqe(&cqe)) {
spdlog::error("wait_cqe failed");
break;
}
auto p = process_cqe(cqe);
uint32_t slot_id = p.first >> 32;
slots.push(slot_id);
completed++;
ring->mark_cqe_seen(cqe);| void poll_completions() { | ||
| struct io_uring_cqe *cqe; | ||
| ring->wait_cqe(&cqe); | ||
| ring->peek_cqe(&cqe); | ||
| if ((cqe->user_data >> 32) < NUM_SLOTS) { | ||
| uint32_t slot = (cqe->user_data >> 32); | ||
| slots.push(slot); | ||
| } else { | ||
| int reader_idx = cqe->user_data & 0xffff; | ||
| readers[reader_idx]->process_io_completion(cqe->user_data); | ||
| auto task = readers[reader_idx]->get_next_io(); | ||
| if (task.has_value()) { | ||
| task.value().user_data |= reader_idx; | ||
| ring->prepare_read(task.value()); | ||
| } | ||
| } | ||
| ring->mark_cqe_seen(cqe); | ||
| } |
There was a problem hiding this comment.
This completion polling logic has a couple of issues:
- Calling
wait_cqeand thenpeek_cqeis redundant.wait_cqealready blocks and returns a valid CQE. - The logic to distinguish between read and write completions is flawed. The condition
(cqe->user_data >> 32) < NUM_SLOTSwill be true for both read completions (whereuser_data >> 32is 0) and write completions. This will cause read completions to be misinterpreted as write completions, breaking the logic.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces significant changes to integrate asynchronous I/O using io_uring, which is a great step towards improving performance. The new code is quite complex and introduces new modules for async operations. My review focuses on correctness and potential issues in the new async logic. I've found a few critical issues related to incorrect calculations, potential deadlocks, and bugs in the I/O handling logic that should be addressed. There are also some opportunities for refactoring to improve maintainability. Overall, this is a solid effort to leverage modern kernel features for I/O-intensive tasks.
| posix_fallocate64(write_fd, 0, RecordType::VALUE_LENGTH * v.size()); | ||
|
|
||
| uint64_t values_per_chunk = v.size() / config.num_threads; | ||
| assert(values_per_chunk % io_uring_utils::BLOCK_ALIGN == 0); |
There was a problem hiding this comment.
This assertion is incorrect. It's comparing values_per_chunk, which is a count of records, against io_uring_utils::BLOCK_ALIGN, which is an alignment size in bytes. The likely intent is to ensure that the total size of the values for the chunk is block-aligned. The assertion should be modified to check the size in bytes.
assert((values_per_chunk * RecordType::VALUE_LENGTH) % io_uring_utils::BLOCK_ALIGN == 0);| bool prepare_read(ReadTask &task) { | ||
| struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_); | ||
| if (!sqe) return false; | ||
| io_uring_prep_read(sqe, task.fd, task.buf, task.bytes, task.offset); |
There was a problem hiding this comment.
The len parameter of io_uring_prep_read is an unsigned int (typically 32 bits), but you are passing task.bytes, which is a uint64_t. This could lead to silent truncation and incorrect behavior if task.bytes exceeds the maximum value of uint32_t. It's safer to perform an explicit cast to uint32_t to match the function signature and make the intent clear, similar to your other prepare_read overload.
| io_uring_prep_read(sqe, task.fd, task.buf, task.bytes, task.offset); | |
| io_uring_prep_read(sqe, task.fd, task.buf, static_cast<uint32_t>(task.bytes), task.offset); |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces asynchronous I/O using io_uring to optimize the merge sort pipeline, fusing several stages to improve performance. The implementation is comprehensive, with new classes to manage async reads and writes. While the overall approach is sound, I've identified a few critical issues concerning error handling and strict alignment assumptions that could lead to runtime failures or incorrect sorting results. Additionally, there are several opportunities to improve code clarity, robustness, and configurability. My review comments provide specific details and suggestions to address these points.
| if (p.second < 0) { | ||
| spdlog::error("read chunk {} failed: {}", chunk_id, p.second); | ||
| ring.mark_cqe_seen(cqe); | ||
| completed++; | ||
| continue; | ||
| } |
There was a problem hiding this comment.
The error handling for a failed read operation is insufficient. If a read fails, an error is logged, but the loop continues by incrementing completed. This will cause extract_keys_from_chunk to process an uninitialized or garbage-filled buffer, leading to incorrect behavior or a crash later in the process. A failed I/O operation at this stage should be treated as a fatal error, and the program should probably terminate.
| posix_fallocate64(write_fd, 0, RecordType::VALUE_LENGTH * v.size()); | ||
|
|
||
| uint64_t values_per_chunk = v.size() / config.num_threads; | ||
| assert(values_per_chunk % io_uring_utils::BLOCK_ALIGN == 0); |
There was a problem hiding this comment.
The assertion assert(values_per_chunk % io_uring_utils::BLOCK_ALIGN == 0) is too restrictive and will likely cause the application to crash for many common configurations. values_per_chunk depends on user-provided memory sizes and thread counts, which are not guaranteed to result in a value that is a multiple of the block alignment. The underlying ValueWriterPostSort requires this alignment for its I/O operations. The logic should be made more flexible to handle chunks that are not perfectly aligned, for instance by padding the last write operation if necessary.
|
|
||
| int fd; | ||
| uint64_t file_offset; // Offset within the file | ||
| uint64_t end_file_offset; |
| if ((cqe->user_data >> 32) < NUM_SLOTS) { | ||
| // spdlog::debug("Processing write completion"); | ||
| uint32_t slot = (cqe->user_data >> 32); | ||
| slots.push(slot); | ||
| } else { | ||
| int reader_idx = (cqe->user_data >> 16) & 0xffff; | ||
| // spdlog::debug("Reader idx: {}", reader_idx); | ||
| readers[reader_idx]->process_io_completion(cqe->user_data); | ||
| auto task = readers[reader_idx]->get_next_io(); | ||
| if (task.has_value()) { | ||
| task.value().user_data |= (NUM_SLOTS << 32); | ||
| ring->prepare_read(task.value()); | ||
| } | ||
| } |
There was a problem hiding this comment.
The logic for distinguishing between different types of I/O completions relies on magic numbers and bit manipulation of user_data (e.g., (cqe->user_data >> 32) < NUM_SLOTS, (cqe->user_data >> 16) & 0xffff). This approach is fragile and hard to maintain. Consider defining a struct for user_data or using an enum and helper functions to pack/unpack the data in a more readable and type-safe manner. For example:
struct CompletionData {
uint32_t type; // e.g., 0 for write, 1 for read
uint32_t id; // e.g., slot_id or reader_idx
// ... other data
};|
|
||
| constexpr uint32_t BLOCK_ALIGN = 4096; | ||
|
|
||
| constexpr bool POLL = true; |
There was a problem hiding this comment.
Hardcoding POLL = true enables IORING_SETUP_IOPOLL, which can be beneficial for low-latency devices but may lead to 100% CPU utilization on some systems or for certain workloads. This can be inefficient. It would be better to make this a runtime configuration option, allowing users to enable it only when appropriate for their hardware and use case.
|
|
||
| int main(int argc, char* argv[]) { | ||
| spdlog::set_level(spdlog::level::info); | ||
| spdlog::set_level(spdlog::level::debug); |
There was a problem hiding this comment.
The logging level is hardcoded to debug. While useful for development, this will produce verbose output in a production environment, which might not be desirable and could have a minor performance impact. It's better to set the default level to info and provide a command-line argument to enable debug logging when needed.
| spdlog::set_level(spdlog::level::debug); | |
| spdlog::set_level(spdlog::level::info); |
| using ItemType = KeyValue<i64, i64>; | ||
| // static_assert(std::is_same_v<decltype(ItemType::key), int64_t>); | ||
| // static_assert(std::is_same_v<decltype(ItemType::value), uint64_t>); |
There was a problem hiding this comment.
The function run_merge_avx_512 is templated on RecordType, but ItemType is now hardcoded to KeyValue<i64, i64>, and the static_asserts that validated RecordType's properties have been commented out. This makes the function's template parameter misleading and reduces its generality. If this function is now only intended to work with types compatible with KeyValue<i64, i64>, consider adjusting the function signature to make this explicit, or restore the generic behavior.
This commit integrates async io via io_uring in the merge sort flow. Following changes have been made:
-First two stages are fused together, which involves reading records in chunks via io_uring and separating the keys and values in those chunks