Skip to content

Integrate async IO#2

Open
proteetpaul wants to merge 11 commits into
mainfrom
async-io
Open

Integrate async IO#2
proteetpaul wants to merge 11 commits into
mainfrom
async-io

Conversation

@proteetpaul
Copy link
Copy Markdown
Owner

This commit integrates async io via io_uring in the merge sort flow. Following changes have been made:
-First two stages are fused together, which involves reading records in chunks via io_uring and separating the keys and values in those chunks

  • The two stages immediately after in-place sort are also fused, that is values are copied over in small chunks and writes are issued
  • The final stages post merging of keys are fused, that is, intermediate values are read from disk, merged in memory, and chunks of records are written out.

@proteetpaul
Copy link
Copy Markdown
Owner Author

/gemini review

@gemini-code-assist
Copy link
Copy Markdown

Summary of Changes

Hello @proteetpaul, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request refactors the merge sort implementation to leverage io_uring for asynchronous I/O, aiming to improve overall sorting performance. By fusing several I/O and processing stages, the system can now perform operations like reading input, extracting keys, and writing intermediate/final values more efficiently, reducing bottlenecks and increasing throughput. The changes provide a configurable option to enable this new asynchronous path.

Highlights

  • Asynchronous I/O Integration: The pull request introduces io_uring to enable asynchronous I/O operations, significantly enhancing the performance of the merge sort flow by overlapping I/O with computation.
  • Stage Fusion for Efficiency: Several stages of the sorting process have been fused: initial record reading and key extraction, value copying and writing post-sort, and intermediate value reading, merging, and final writing. This reduces overhead and improves data flow.
  • New Configuration Option: A new command-line argument --use-async has been added, allowing users to enable or disable the io_uring based asynchronous I/O path at runtime.
  • Dedicated Async I/O Utilities: New utility classes and functions (UringRing, AsyncValueReader, ValueWriterPostSort, ValueWriterPostMerge) have been introduced to manage io_uring operations and streamline the asynchronous data handling across different sorting phases.
Changelog
  • src/CMakeLists.txt
    • Added find_library and find_path for liburing.
    • Linked liburing to the sorter executable, making io_uring available.
  • src/async_stages.h
    • New file introduced to define core asynchronous I/O logic.
    • Includes read_run_and_extract_keys for fusing input reading and key extraction using io_uring.
    • Introduces ValueWriterPostSort class for asynchronously writing values after in-place sorting.
    • Introduces AsyncValueReader class for asynchronous reading of values during merge.
    • Introduces ValueWriterPostMerge class for asynchronously writing merged key-value pairs to the final output.
  • src/config.h
    • Added a new boolean member use_async to the Config struct, defaulting to false, to control io_uring usage.
  • src/io_uring_utils.h
    • New file introduced to provide a wrapper for liburing operations.
    • Defines UringRing struct to manage io_uring context, including submission and completion queue operations.
    • Provides utility functions for preparing read and write requests and handling completion events.
    • Includes aligned_alloc_read_buffer for memory alignment suitable for O_DIRECT and io_uring.
  • src/main.cpp
    • Added use_async member to ParsedArgs struct.
    • Updated printHelp to include the new --use-async command-line option.
    • Modified parseArguments to parse the --use-async flag and set args.use_async.
    • Passed the use_async argument from ParsedArgs to the Config object.
  • src/sorted_run.h
    • Removed the ReaderState enum and related commented-out members from SortedRunReader, as its functionality is superseded by the new asynchronous reading mechanisms.
  • src/sorter.h
    • Included new headers io_uring_utils.h and async_stages.h.
    • Modified write_to_disk to be inline.
    • Added declarations for generate_run_for_merge_sort_after_keys and write_back_values_post_merge_async.
    • The sort() method was updated to conditionally use the new asynchronous I/O path for reading input, extracting keys, and writing intermediate values, as well as for the final merge output, based on the config.use_async flag.
    • Implemented write_back_values_post_merge_async to utilize ValueWriterPostMerge for asynchronous final output writes.
Activity
  • The pull request introduces significant changes to integrate io_uring for asynchronous I/O, affecting multiple core components of the sorting application.
  • New files src/async_stages.h and src/io_uring_utils.h have been added, containing the bulk of the new asynchronous logic and utility functions.
  • Existing files like src/sorter.h and src/main.cpp have been modified to incorporate and enable the new asynchronous workflow.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request integrates asynchronous I/O using io_uring to fuse several stages of the merge sort process, aiming to improve performance by overlapping I/O and CPU work. The changes are significant and introduce new asynchronous workflows for reading, key extraction, and writing data.

My review has identified several critical correctness and memory management issues in the new asynchronous code paths, particularly concerning pointer arithmetic, loop indexing, memory leaks, and use-after-free bugs. There are also some areas where the implementation could be improved for clarity and to complete the parallelization effort. Addressing these critical issues is essential for the feature to function correctly.

Comment thread src/async_stages.h
Comment thread src/async_stages.h
Comment thread src/sorter.h Outdated
Comment thread src/sorter.h Outdated
Comment thread src/async_stages.h Outdated
Comment thread src/async_stages.h Outdated
Comment thread src/sorter.h Outdated
Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces asynchronous I/O using io_uring to improve performance, which is a great initiative. However, the current implementation has several critical correctness and performance issues that need to be addressed before merging. I've identified problems in memory management, loop control, data handling in async operations, and incorrect use of C++ features like Variable Length Arrays. My review includes detailed comments and suggestions to fix these issues. Please review them carefully.

Comment thread src/async_stages.h
Comment thread src/async_stages.h Outdated
Comment thread src/async_stages.h Outdated
Comment thread src/async_stages.h
Comment on lines +266 to +275
struct io_uring_cqe* cqe = nullptr;
if (!ring->wait_cqe(&cqe)) {
spdlog::error("wait_cqe failed");
break;
}
auto p = process_cqe(cqe);
uint32_t slot_id = (uint32_t) (p.first & 0xffff);
slots.push(slot_id);
int result = p.second;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This block has several critical issues:

  1. The completed counter is never incremented inside the while loop, which will cause an infinite loop if num_writes > 0.
  2. ring->mark_cqe_seen(cqe) is never called after processing a completion, which will cause io_uring to stall.
  3. The slot_id is extracted incorrectly using p.first & 0xffff. It should be p.first >> 32 based on how user_data is packed.

These issues will prevent the run() method from working correctly.

                struct io_uring_cqe* cqe = nullptr;
                while (ring->peek_cqe(&cqe)) {
                    auto p = process_cqe(cqe);
                    uint32_t slot_id = p.first >> 32;
                    uint32_t write_id = p.first & 0xFFFFFFFF;
                    if (p.second < 0) {
                        spdlog::error("write chunk {} failed: {}", write_id, p.second);
                    }
                    slots.push(slot_id);
                    completed++;
                    ring->mark_cqe_seen(cqe);
                }
                if (!ring->wait_cqe(&cqe)) {
                    spdlog::error("wait_cqe failed");
                    break;
                }
                auto p = process_cqe(cqe);
                uint32_t slot_id = p.first >> 32;
                slots.push(slot_id);
                completed++;
                ring->mark_cqe_seen(cqe);

Comment thread src/async_stages.h Outdated
Comment on lines +414 to +431
void poll_completions() {
struct io_uring_cqe *cqe;
ring->wait_cqe(&cqe);
ring->peek_cqe(&cqe);
if ((cqe->user_data >> 32) < NUM_SLOTS) {
uint32_t slot = (cqe->user_data >> 32);
slots.push(slot);
} else {
int reader_idx = cqe->user_data & 0xffff;
readers[reader_idx]->process_io_completion(cqe->user_data);
auto task = readers[reader_idx]->get_next_io();
if (task.has_value()) {
task.value().user_data |= reader_idx;
ring->prepare_read(task.value());
}
}
ring->mark_cqe_seen(cqe);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This completion polling logic has a couple of issues:

  1. Calling wait_cqe and then peek_cqe is redundant. wait_cqe already blocks and returns a valid CQE.
  2. The logic to distinguish between read and write completions is flawed. The condition (cqe->user_data >> 32) < NUM_SLOTS will be true for both read completions (where user_data >> 32 is 0) and write completions. This will cause read completions to be misinterpreted as write completions, breaking the logic.

Comment thread src/async_stages.h Outdated
Comment thread src/sorter.h Outdated
Comment thread src/async_stages.h Outdated
Comment thread src/sorter.h Outdated
@proteetpaul
Copy link
Copy Markdown
Owner Author

/gemini review

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces significant changes to integrate asynchronous I/O using io_uring, which is a great step towards improving performance. The new code is quite complex and introduces new modules for async operations. My review focuses on correctness and potential issues in the new async logic. I've found a few critical issues related to incorrect calculations, potential deadlocks, and bugs in the I/O handling logic that should be addressed. There are also some opportunities for refactoring to improve maintainability. Overall, this is a solid effort to leverage modern kernel features for I/O-intensive tasks.

Comment thread src/async_stages.h Outdated
Comment thread src/async_stages.h Outdated
Comment thread src/async_stages.h
Comment thread src/async_stages.h Outdated
Comment thread src/sorter.h
posix_fallocate64(write_fd, 0, RecordType::VALUE_LENGTH * v.size());

uint64_t values_per_chunk = v.size() / config.num_threads;
assert(values_per_chunk % io_uring_utils::BLOCK_ALIGN == 0);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This assertion is incorrect. It's comparing values_per_chunk, which is a count of records, against io_uring_utils::BLOCK_ALIGN, which is an alignment size in bytes. The likely intent is to ensure that the total size of the values for the chunk is block-aligned. The assertion should be modified to check the size in bytes.

            assert((values_per_chunk * RecordType::VALUE_LENGTH) % io_uring_utils::BLOCK_ALIGN == 0);

Comment thread src/io_uring_utils.h
bool prepare_read(ReadTask &task) {
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
if (!sqe) return false;
io_uring_prep_read(sqe, task.fd, task.buf, task.bytes, task.offset);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The len parameter of io_uring_prep_read is an unsigned int (typically 32 bits), but you are passing task.bytes, which is a uint64_t. This could lead to silent truncation and incorrect behavior if task.bytes exceeds the maximum value of uint32_t. It's safer to perform an explicit cast to uint32_t to match the function signature and make the intent clear, similar to your other prepare_read overload.

Suggested change
io_uring_prep_read(sqe, task.fd, task.buf, task.bytes, task.offset);
io_uring_prep_read(sqe, task.fd, task.buf, static_cast<uint32_t>(task.bytes), task.offset);

@proteetpaul
Copy link
Copy Markdown
Owner Author

/gemini review

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces asynchronous I/O using io_uring to optimize the merge sort pipeline, fusing several stages to improve performance. The implementation is comprehensive, with new classes to manage async reads and writes. While the overall approach is sound, I've identified a few critical issues concerning error handling and strict alignment assumptions that could lead to runtime failures or incorrect sorting results. Additionally, there are several opportunities to improve code clarity, robustness, and configurability. My review comments provide specific details and suggestions to address these points.

Comment thread src/async_stages.h
Comment on lines +132 to +137
if (p.second < 0) {
spdlog::error("read chunk {} failed: {}", chunk_id, p.second);
ring.mark_cqe_seen(cqe);
completed++;
continue;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The error handling for a failed read operation is insufficient. If a read fails, an error is logged, but the loop continues by incrementing completed. This will cause extract_keys_from_chunk to process an uninitialized or garbage-filled buffer, leading to incorrect behavior or a crash later in the process. A failed I/O operation at this stage should be treated as a fatal error, and the program should probably terminate.

Comment thread src/sorter.h
posix_fallocate64(write_fd, 0, RecordType::VALUE_LENGTH * v.size());

uint64_t values_per_chunk = v.size() / config.num_threads;
assert(values_per_chunk % io_uring_utils::BLOCK_ALIGN == 0);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The assertion assert(values_per_chunk % io_uring_utils::BLOCK_ALIGN == 0) is too restrictive and will likely cause the application to crash for many common configurations. values_per_chunk depends on user-provided memory sizes and thread counts, which are not guaranteed to result in a value that is a multiple of the block alignment. The underlying ValueWriterPostSort requires this alignment for its I/O operations. The logic should be made more flexible to handle chunks that are not perfectly aligned, for instance by padding the last write operation if necessary.

Comment thread src/async_stages.h

int fd;
uint64_t file_offset; // Offset within the file
uint64_t end_file_offset;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The member variable end_file_offset is declared but never used within the AsyncValueReader class. It should be removed to improve code clarity and avoid confusion.

Comment thread src/async_stages.h
Comment on lines +443 to +456
if ((cqe->user_data >> 32) < NUM_SLOTS) {
// spdlog::debug("Processing write completion");
uint32_t slot = (cqe->user_data >> 32);
slots.push(slot);
} else {
int reader_idx = (cqe->user_data >> 16) & 0xffff;
// spdlog::debug("Reader idx: {}", reader_idx);
readers[reader_idx]->process_io_completion(cqe->user_data);
auto task = readers[reader_idx]->get_next_io();
if (task.has_value()) {
task.value().user_data |= (NUM_SLOTS << 32);
ring->prepare_read(task.value());
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for distinguishing between different types of I/O completions relies on magic numbers and bit manipulation of user_data (e.g., (cqe->user_data >> 32) < NUM_SLOTS, (cqe->user_data >> 16) & 0xffff). This approach is fragile and hard to maintain. Consider defining a struct for user_data or using an enum and helper functions to pack/unpack the data in a more readable and type-safe manner. For example:

struct CompletionData {
    uint32_t type; // e.g., 0 for write, 1 for read
    uint32_t id;   // e.g., slot_id or reader_idx
    // ... other data
};

Comment thread src/io_uring_utils.h Outdated

constexpr uint32_t BLOCK_ALIGN = 4096;

constexpr bool POLL = true;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Hardcoding POLL = true enables IORING_SETUP_IOPOLL, which can be beneficial for low-latency devices but may lead to 100% CPU utilization on some systems or for certain workloads. This can be inefficient. It would be better to make this a runtime configuration option, allowing users to enable it only when appropriate for their hardware and use case.

Comment thread src/main.cpp

int main(int argc, char* argv[]) {
spdlog::set_level(spdlog::level::info);
spdlog::set_level(spdlog::level::debug);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logging level is hardcoded to debug. While useful for development, this will produce verbose output in a production environment, which might not be desirable and could have a minor performance impact. It's better to set the default level to info and provide a command-line argument to enable debug logging when needed.

Suggested change
spdlog::set_level(spdlog::level::debug);
spdlog::set_level(spdlog::level::info);

Comment thread src/merge.h
Comment on lines +55 to +57
using ItemType = KeyValue<i64, i64>;
// static_assert(std::is_same_v<decltype(ItemType::key), int64_t>);
// static_assert(std::is_same_v<decltype(ItemType::value), uint64_t>);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The function run_merge_avx_512 is templated on RecordType, but ItemType is now hardcoded to KeyValue<i64, i64>, and the static_asserts that validated RecordType's properties have been commented out. This makes the function's template parameter misleading and reduces its generality. If this function is now only intended to work with types compatible with KeyValue<i64, i64>, consider adjusting the function signature to make this explicit, or restore the generic behavior.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant