feat: add S3/OSS/HTTP extension support for cloud object storage#179
feat: add S3/OSS/HTTP extension support for cloud object storage#179BingqingLyu wants to merge 15 commits intoalibaba:mainfrom
Conversation
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
Review Summary by QodoAdd S3/OSS/HTTP extension support for cloud object storage
WalkthroughsDescription• Adds comprehensive cloud object storage support through a new S3 extension module enabling read-only access to AWS S3, Alibaba Cloud OSS, and HTTP/HTTPS endpoints • **S3/OSS Filesystem**: Implements S3FileSystem wrapper around Arrow's S3 filesystem with URI parsing, glob pattern expansion, and configurable credentials (explicit keys, environment variables, anonymous access) • **HTTP/HTTPS Filesystem**: Implements HTTPFileSystem using libcurl with Range request support, SSL/TLS verification, and custom header configuration • **Configuration System**: Centralized options for S3 (region, endpoint, credentials, timeouts) and HTTP (bearer token, SSL verification, proxy settings) • **VFS Integration**: Registers S3, OSS, HTTP, and HTTPS protocols in the global FileSystemRegistry with proper interface adaptation • **Comprehensive Testing**: Unit tests for URI parsing, configuration validation, and end-to-end integration tests accessing public datasets from AWS S3, Alibaba OSS, and HTTPS endpoints • **Python Bindings**: E2E tests for data loading from cloud storage with row/column count and data type validation • **Build System**: CMake configuration for S3 extension with Arrow S3 filesystem and libcurl dependencies • **CI/CD Integration**: Extension tests added to GitHub workflows for automated validation • **Documentation**: User guide covering installation, credential modes, configuration options, and query examples Diagramflowchart LR
A["S3/OSS URIs<br/>HTTP/HTTPS URLs"] -->|"Parse & Configure"| B["S3FileSystem<br/>HTTPFileSystem"]
B -->|"Implement VFS Interface"| C["FileSystemRegistry"]
C -->|"Register Protocols"| D["s3://, oss://<br/>http://, https://"]
B -->|"Adapt to Arrow"| E["Arrow FileSystem"]
E -->|"Access Cloud Storage"| F["AWS S3<br/>Alibaba OSS<br/>HTTP Endpoints"]
G["S3Options<br/>HTTPOptions"] -->|"Configure"| B
H["Unit & Integration<br/>Tests"] -->|"Validate"| B
File Changes1. extension/s3/tests/s3_test.cpp
|
Code Review by Qodo
1. Hardcoded scheme dispatch implemented
|
… between system libcrypto and AWS-CRT embedded crypto
49613fc to
8b1425c
Compare
| // Register S3/OSS filesystem factories in the global VFS registry | ||
| static void RegisterS3Provider() { | ||
| auto* vfs = neug::main::MetadataRegistry::getVFS(); | ||
|
|
||
| // Register for both s3:// and oss:// schemes | ||
| vfs->Register("s3", neug::extension::s3::CreateS3FileSystem); | ||
| vfs->Register("oss", neug::extension::s3::CreateS3FileSystem); | ||
|
|
||
| LOG(INFO) << "[s3 extension] S3FileSystem registered for schemes: s3, oss"; | ||
| } | ||
|
|
||
| // Register HTTP/HTTPS filesystem factories in the global VFS registry | ||
| static void RegisterHTTPProvider() { | ||
| auto* vfs = neug::main::MetadataRegistry::getVFS(); | ||
|
|
||
| // Register for both http:// and https:// schemes | ||
| vfs->Register("http", neug::extension::http::CreateHTTPFileSystem); | ||
| vfs->Register("https", neug::extension::http::CreateHTTPFileSystem); | ||
|
|
There was a problem hiding this comment.
1. Hardcoded scheme dispatch implemented 📎 Requirement gap ⚙ Maintainability
The new remote filesystem support hardcodes URI scheme parsing/dispatch (via a custom VFS registry and bespoke URI parsers) instead of using Arrow’s arrow::fs::FileSystemFromUri() dispatch mechanism. This violates the requirement to rely on Arrow’s URI dispatch for scheme-based filesystem resolution.
Agent Prompt
## Issue description
Scheme-based filesystem resolution is implemented via custom string parsing and a custom registry, but the compliance requirement mandates using Arrow’s URI dispatch (`arrow::fs::FileSystemFromUri()` or an Arrow-supported equivalent).
## Issue Context
- Current implementation registers providers for `s3`, `oss`, `http`, `https` and uses custom URI parsing (`S3URIComponents::parse`, `HTTPURIComponents::parse`) instead of Arrow URI dispatch.
- Arrow’s dispatch should be used wherever possible for scheme-to-filesystem resolution.
## Fix Focus Areas
- src/utils/file_sys/file_system.cc[68-97]
- extension/s3/src/s3_extension.cpp[29-47]
- extension/s3/src/s3_filesystem.cc[141-184]
- extension/s3/src/http_filesystem.cc[46-90]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| T getOptionWithEnv(const reader::options_t& options, | ||
| const reader::Option<T>& opt, | ||
| std::initializer_list<const char*> option_keys, | ||
| std::initializer_list<const char*> env_keys) { | ||
| // Priority 1: schema.options - check all key aliases | ||
| for (const char* key : option_keys) { | ||
| auto it = options.find(key); | ||
| if (it != options.end() && !it->second.empty()) { | ||
| // Use the Option's parser to handle type conversion | ||
| reader::options_t temp; | ||
| temp.emplace(opt.getKey(), it->second); | ||
| return opt.get(temp); | ||
| } | ||
| } |
There was a problem hiding this comment.
2. Empty endpoint override ignored 🐞 Bug ≡ Correctness
getOptionWithEnv() skips schema.options entries with empty values, so setting ENDPOINT_OVERRIDE="" does not override environment endpoints and can unintentionally route AWS S3 reads to an env-configured OSS/S3-compatible endpoint. This contradicts the test’s own comment that an empty ENDPOINT_OVERRIDE “Force[s] default AWS S3 endpoint”, and can cause incorrect runtime behavior in shared environments.
Agent Prompt
### Issue description
`getOptionWithEnv()` ignores explicitly-provided empty-string option values, so callers cannot clear env-provided endpoint/region (e.g., to force default AWS S3) by setting `ENDPOINT_OVERRIDE=""`.
### Issue Context
The S3 tests and expected behavior assume an empty override should take precedence over environment variables.
### Fix Focus Areas
- extension/s3/src/s3_options.cc[40-53]
- extension/s3/src/s3_options.cc[138-158]
### Suggested fix
- Treat **presence** of any alias key in `schema.options` as higher priority than env vars, even if the value is empty.
- For string options (endpoint/region), remove the `!it->second.empty()` guard so explicit empty values are honored.
- Keep env fallback only when none of the option keys are present.
- Add/adjust a unit test to validate: when env has `OSS_ENDPOINT` but schema.options has `ENDPOINT_OVERRIDE=""`, the resolved endpoint is empty and AWS defaults are used.
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
extension/s3/src/http_filesystem.cc
Outdated
| arrow::Status HTTPRandomAccessFile::ReadRange(int64_t offset, int64_t length, void* buffer) { | ||
| if (closed_) { | ||
| return arrow::Status::Invalid("File is closed"); | ||
| } | ||
|
|
||
| // Setup GET request with HTTP Range header (RFC 7233) | ||
| curl_easy_reset(curl_handle_); | ||
| SetupCURLHandle(curl_handle_); | ||
| curl_easy_setopt(curl_handle_, CURLOPT_URL, url_.c_str()); | ||
|
|
||
| // Set Range header: "bytes=offset-end" | ||
| // IMPORTANT: CURLOPT_RANGE expects "start-end" format without "bytes=" | ||
| // CURL will automatically add the "Range: bytes=" prefix | ||
| std::string range_value = std::to_string(offset) + "-" + | ||
| std::to_string(offset + length - 1); | ||
| curl_easy_setopt(curl_handle_, CURLOPT_RANGE, range_value.c_str()); | ||
|
|
||
| // Setup direct write to buffer | ||
| std::pair<void*, size_t> write_info{buffer, static_cast<size_t>(length)}; | ||
| curl_easy_setopt(curl_handle_, CURLOPT_WRITEFUNCTION, WriteCallbackDirect); | ||
| curl_easy_setopt(curl_handle_, CURLOPT_WRITEDATA, &write_info); | ||
|
|
||
| // Perform request | ||
| CURLcode res = curl_easy_perform(curl_handle_); | ||
|
|
||
| if (res != CURLE_OK) { | ||
| return arrow::Status::IOError("HTTP Range request failed: " + | ||
| std::string(curl_easy_strerror(res))); | ||
| } | ||
|
|
||
| // Verify response code | ||
| // 200 = OK (server doesn't support Range, sent full file) | ||
| // 206 = Partial Content (server supports Range, sent requested range) | ||
| long response_code = 0; | ||
| curl_easy_getinfo(curl_handle_, CURLINFO_RESPONSE_CODE, &response_code); | ||
|
|
||
| if (response_code == 206) { | ||
| // Success: server sent the requested range | ||
| // Check how much data was actually written | ||
| if (write_info.second > 0) { | ||
| // Buffer wasn't completely filled - server sent less data than requested | ||
| LOG(WARNING) << "HTTP Range request returned less data than expected. " | ||
| << "Requested: " << length << " bytes, " | ||
| << "Received: " << (length - write_info.second) << " bytes"; | ||
| } | ||
| return arrow::Status::OK(); | ||
| } else if (response_code == 200) { | ||
| // Server doesn't support Range requests and sent the full file | ||
| // This is a critical error because we can't fit the full file in our buffer | ||
| return arrow::Status::IOError( | ||
| "Server doesn't support HTTP Range requests (returned 200 instead of 206). " | ||
| "Cannot read file efficiently without Range support."); | ||
| } else { | ||
| return arrow::Status::IOError("HTTP Range request failed with status " + | ||
| std::to_string(response_code)); | ||
| } | ||
| } | ||
|
|
||
| arrow::Result<int64_t> HTTPRandomAccessFile::Tell() const { | ||
| if (closed_) { | ||
| return arrow::Status::Invalid("File is closed"); | ||
| } | ||
| return position_; | ||
| } | ||
|
|
||
| arrow::Result<int64_t> HTTPRandomAccessFile::GetSize() { | ||
| if (closed_) { | ||
| return arrow::Status::Invalid("File is closed"); | ||
| } | ||
| return file_size_; | ||
| } | ||
|
|
||
| arrow::Status HTTPRandomAccessFile::Seek(int64_t position) { | ||
| if (closed_) { | ||
| return arrow::Status::Invalid("File is closed"); | ||
| } | ||
| if (position < 0) { | ||
| return arrow::Status::Invalid("Negative seek position"); | ||
| } | ||
| position_ = position; | ||
| return arrow::Status::OK(); | ||
| } | ||
|
|
||
| arrow::Result<int64_t> HTTPRandomAccessFile::ReadAt(int64_t position, int64_t nbytes, void* out) { | ||
| if (closed_) { | ||
| return arrow::Status::Invalid("File is closed"); | ||
| } | ||
|
|
||
| auto status = ReadRange(position, nbytes, out); | ||
| if (!status.ok()) { | ||
| return status; | ||
| } | ||
|
|
||
| return nbytes; | ||
| } |
There was a problem hiding this comment.
3. Http readat misreports bytes 🐞 Bug ≡ Correctness
HTTPRandomAccessFile::ReadRange() logs a short response (received < requested) but still returns OK, and ReadAt() always returns the requested nbytes even when fewer bytes were written. This can feed Arrow readers buffers containing uninitialized/incorrect data, causing corrupt reads or parsing failures.
Agent Prompt
### Issue description
`HTTPRandomAccessFile` may successfully return from a range request even when fewer bytes were received than requested, and `ReadAt()` reports the requested size rather than actual bytes read.
### Issue Context
Arrow dataset/parquet readers rely on `RandomAccessFile::ReadAt` semantics; misreporting byte counts can cause corrupted buffers and undefined behavior.
### Fix Focus Areas
- extension/s3/src/http_filesystem.cc[339-394]
- extension/s3/src/http_filesystem.cc[422-453]
### Suggested fix
- Compute `bytes_received = length - write_info.second` and:
- If `bytes_received == 0` and the server indicates EOF/416, return 0 (or an appropriate Arrow status).
- If `bytes_received < length`, either return `bytes_received` (preferred) or return an IOError to avoid propagating partial data as complete.
- Update `ReadAt(position, nbytes, void*)` to return `bytes_received` instead of `nbytes`.
- Update `ReadAt(position, nbytes)` (buffer-returning overload) to return a buffer sized to `bytes_received` (e.g., allocate `nbytes`, then `Slice(0, bytes_received)`), or allocate exactly `bytes_received` when known.
- Add handling for `nbytes == 0` to avoid constructing an invalid range like `offset-(offset-1)`.
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| // Timeouts | ||
| auto connect_timeout_it = options_.find(HTTPConfigOptionKeys::kConnectTimeout); | ||
| int connect_timeout = HTTPConfigDefaults::kConnectTimeoutDefault; | ||
| if (connect_timeout_it != options_.end()) { | ||
| connect_timeout = std::stoi(connect_timeout_it->second); | ||
| } | ||
| curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, connect_timeout); | ||
|
|
||
| auto request_timeout_it = options_.find(HTTPConfigOptionKeys::kRequestTimeout); | ||
| int request_timeout = HTTPConfigDefaults::kRequestTimeoutDefault; | ||
| if (request_timeout_it != options_.end()) { | ||
| request_timeout = std::stoi(request_timeout_it->second); | ||
| } | ||
| curl_easy_setopt(curl, CURLOPT_TIMEOUT, request_timeout); |
There was a problem hiding this comment.
4. Http timeout parsing can throw 🐞 Bug ☼ Reliability
HTTPRandomAccessFile::SetupCURLHandle() uses std::stoi on user-provided timeout values without catching exceptions. Invalid values can throw std::exception types that are not caught by HTTPFileSystem::OpenInputFile() (it only catches neug::exception::Exception), potentially terminating the process.
Agent Prompt
### Issue description
Parsing HTTP timeout options with `std::stoi` can throw uncaught `std::exception` types, which are not handled by the current exception conversion path.
### Issue Context
Timeout values come from `FileSchema.options` (user input). Invalid input must not crash the process.
### Fix Focus Areas
- extension/s3/src/http_filesystem.cc[294-307]
- extension/s3/src/http_filesystem.cc[614-622]
### Suggested fix
- Wrap `std::stoi` calls in try/catch and throw a `THROW_INVALID_ARGUMENT_EXCEPTION(...)` (or return an Arrow Status) with a clear message indicating the bad key/value.
- Optionally extend `OpenInputFile()` to also catch `const std::exception&` and convert it to `arrow::Status::IOError` so unexpected std exceptions don’t terminate the process.
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| if(BUILD_TEST) | ||
| add_extension_test(NAME s3 | ||
| TEST_SRCS ${CMAKE_CURRENT_SOURCE_DIR}/tests/s3_test.cpp | ||
| ${CMAKE_CURRENT_SOURCE_DIR}/tests/http_test.cpp | ||
| EXTRA_LINK_LIBS arrow_dataset_objlib ${ARROW_PARQUET_LIB} | ||
| ) | ||
| endif() |
There was a problem hiding this comment.
5. S3_extension_test missing curl link 🐞 Bug ≡ Correctness
The s3_extension_test executable compiles HTTP filesystem sources that call libcurl, but the test target does not link against CURL_LIBRARIES. This will typically cause undefined-reference linker failures for curl symbols when building tests.
Agent Prompt
### Issue description
`s3_extension_test` compiles code that uses libcurl but does not link against libcurl.
### Issue Context
`add_extension_test()` compiles extension sources directly into the test binary, so the test must explicitly link all dependencies used by those sources.
### Fix Focus Areas
- extension/s3/CMakeLists.txt[34-40]
- extension/CMakeLists.txt[87-106]
### Suggested fix
- In `extension/s3/CMakeLists.txt`, update the `add_extension_test(NAME s3 ...)` call to include curl:
- Add `${CURL_LIBRARIES}` to `EXTRA_LINK_LIBS`.
- If needed on some platforms, also add `${CURL_INCLUDE_DIRS}` via `EXTRA_INCLUDE_DIRS`.
- Ensure the resulting `s3_extension_test` links successfully in CI.
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
- Fix empty endpoint override being ignored (comment #2) - Modified getOptionWithEnv to respect explicitly set empty values - Allows ENDPOINT_OVERRIDE="" to force default AWS S3 endpoint - Fix HTTP ReadAt returning incorrect byte count (comment alibaba#3) - Changed ReadRange to return actual bytes read (Result<int64_t>) - Updated ReadAt to return actual bytes instead of requested nbytes - Handle zero-length reads and HTTP 416 responses - Use arrow::SliceBuffer for correct buffer sizing - Fix HTTP timeout parsing uncaught exceptions (comment alibaba#4) - Added try-catch around std::stoi for timeout values - Added std::exception handler in OpenInputFile - Fix s3_extension_test missing curl link library (comment alibaba#5) - Added CURL_LIBRARIES and CURL_INCLUDE_DIRS to test target
- S3TestEnvironment::SetUp() already initializes S3 subsystem globally - Remove redundant EnsureS3Initialized() from S3ExtensionTest::SetUp() - Each test case no longer needs to initialize S3 independently
PR Title: feat: add S3/OSS/HTTP extension support for cloud object storage
Reviewers:
Related Issues
closes #127
What does this PR do?
This PR adds comprehensive cloud object storage support to NeuG through a new S3 extension module. The extension provides read-only access to files stored on AWS S3, Alibaba Cloud OSS and plain HTTP/HTTPS endpoints, enabling users to directly query data files from cloud storage without local downloads.
What changes in this PR?
New Features
S3/OSS Filesystem Support (
extension/s3/src/s3_filesystem.cc,include/s3_filesystem.h)S3FileSystembased on Arrow's S3 filesystem integrationHTTP/HTTPS Filesystem Support (
extension/s3/src/http_filesystem.cc,include/http_filesystem.h)HTTPFileSystemusing libcurl for direct HTTP/HTTPS file accessConfiguration Options (
include/s3_options.h,src/s3_options.cc,include/http_options.h)VFS Interface Integration (
extension/s3/src/s3_extension.cpp)neug::fsys::FileSysteminterfaceglob(),toArrowFileSystem()methodsTesting
S3 Unit Tests (
tests/s3_test.cpp)HTTP Unit Tests (
tests/http_test.cpp)Integration Tests (
tools/python_bind/tests/test_load.py,example/complex_test.py)Documentation & CI
Documentation (
doc/source/extensions/load_s3.md,doc/source/extensions/index.md)CI/CD (
.github/workflows/neug-extension-test.yml,.github/workflows/build-extensions.yml)Build System
extension/s3/CMakeLists.txt,CMakeLists.txt,cmake/BuildArrowAsThirdParty.cmake)