Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions extension/json/tests/json_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <memory>
#include <vector>

#include <arrow/array.h>
#include <arrow/filesystem/localfs.h>
#include <arrow/type.h>
#include "neug/compiler/common/case_insensitive_map.h"
Expand Down Expand Up @@ -100,7 +101,7 @@ class JsonTest : public ::testing::Test {
const std::string& csvFile, const std::vector<std::string>& columnNames,
const std::vector<std::shared_ptr<::common::DataType>>& columnTypes,
const common::case_insensitive_map_t<std::string>& options = {},
const std::vector<std::string>& skipColumns = {},
const std::vector<std::string>& projectColumns = {},
std::shared_ptr<::common::Expression> skipRows = nullptr) {
auto sharedState = std::make_shared<reader::ReadSharedState>();

Expand All @@ -120,7 +121,7 @@ class JsonTest : public ::testing::Test {
externalSchema.file = fileSchema;

sharedState->schema = std::move(externalSchema);
sharedState->skipColumns = skipColumns;
sharedState->projectColumns = projectColumns;
sharedState->skipRows = skipRows;

return sharedState;
Expand Down
16 changes: 8 additions & 8 deletions extension/parquet/tests/parquet_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ TEST_F(ParquetTest, TestIntegration_ColumnPruning) {
PARQUET_THROW_NOT_OK(
parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, 1));

// Set up shared state with skipColumns
// Set up shared state with projectColumns
auto sharedState = std::make_shared<reader::ReadSharedState>();
auto entrySchema = std::make_shared<reader::TableEntrySchema>();
entrySchema->columnNames = {"id", "name", "score", "grade"};
Expand All @@ -551,20 +551,20 @@ TEST_F(ParquetTest, TestIntegration_ColumnPruning) {
externalSchema.file = fileSchema;
sharedState->schema = std::move(externalSchema);

// Neug's column pruning: skip "name" column
sharedState->skipColumns = {"name"};
// Neug's column projection: id, score, grade (exclude "name")
sharedState->projectColumns = {"id", "score", "grade"};

auto reader = createParquetReader(sharedState);
auto localState = std::make_shared<reader::ReadLocalState>();
execution::Context ctx;
reader->read(localState, ctx);

// Verify extension translates skipColumns to Arrow projection
// Should have 3 columns (id, score, grade - "name" is skipped)
// Verify extension translates projectColumns to Arrow projection
// Should have 3 columns (id, score, grade - "name" is excluded)
EXPECT_EQ(ctx.col_num(), 3)
<< "Extension should translate Neug's skipColumns to Arrow column projection";
<< "Extension should translate Neug's projectColumns to Arrow column projection";
EXPECT_EQ(sharedState->columnNum(), 3)
<< "Extension should update columnNum after pruning";
<< "Extension should update columnNum after projection";
}

TEST_F(ParquetTest, TestIntegration_FilterPushdown) {
Expand Down Expand Up @@ -761,7 +761,7 @@ TEST_F(ParquetTest, TestIntegration_CombinedFilterAndProjection) {
externalSchema.entry = entrySchema;
externalSchema.file = fileSchema;
sharedState->schema = std::move(externalSchema);
sharedState->skipColumns = {"name"}; // Prune "name" column
sharedState->projectColumns = {"id", "score", "grade"}; // Exclude "name"
sharedState->skipRows = filterExpr; // Filter score > 90.0

auto reader = createParquetReader(sharedState);
Expand Down
13 changes: 8 additions & 5 deletions include/neug/compiler/function/table/bind_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,14 @@ struct NEUG_API TableFuncBindData {
: columns{other.columns},
numRows{other.numRows},
params(other.params),
columnSkips{other.columnSkips} {}
projectColumns{other.projectColumns} {}
TableFuncBindData& operator=(const TableFuncBindData& other) = delete;
virtual ~TableFuncBindData() = default;

common::idx_t getNumColumns() const { return columns.size(); }
void setColumnSkips(std::vector<bool> skips) {
columnSkips = std::move(skips);
void setProjectColumns(std::vector<std::string> projectColumns) {
this->projectColumns = std::move(projectColumns);
}
std::vector<bool> getColumnSkips() const;

std::shared_ptr<binder::Expression> getRowSkips() const { return rowSkips; }

Expand All @@ -90,8 +89,12 @@ struct NEUG_API TableFuncBindData {
return *common::neug_dynamic_cast<TARGET*>(this);
}

const std::vector<std::string>& getProjectColumns() const {
return projectColumns;
}

protected:
std::vector<bool> columnSkips;
std::vector<std::string> projectColumns;
std::shared_ptr<binder::Expression> rowSkips;
};

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "logical_operator_visitor.h"
#include "neug/compiler/planner/operator/logical_operator.h"
#include "neug/compiler/planner/operator/logical_plan.h"

namespace neug {
namespace optimizer {

// ProjectIntoDataSourceOptimizer pushes a Projection directly into
// LogicalTableFunctionCall when the pattern is:
// LogicalTableFunctionCall -> LogicalProjection
// After rewrite, the Projection is removed and projected expressions are
// carried by LogicalTableFunctionCall::bindData->projectColumns.
class ProjectIntoDataSourceOptimizer : public LogicalOperatorVisitor {
public:
void rewrite(planner::LogicalPlan* plan);

private:
std::shared_ptr<planner::LogicalOperator> visitOperator(
const std::shared_ptr<planner::LogicalOperator>& op);
std::shared_ptr<planner::LogicalOperator> visitProjectionReplace(
std::shared_ptr<planner::LogicalOperator> op) override;

private:
planner::LogicalPlan* plan;
};

} // namespace optimizer
} // namespace neug
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ class NEUG_API LogicalTableFunctionCall final : public LogicalOperator {
const function::TableFunction& getTableFunc() const { return tableFunc; }
function::TableFuncBindData* getBindData() const { return bindData.get(); }

void setColumnSkips(std::vector<bool> columnSkips) {
bindData->setColumnSkips(std::move(columnSkips));
void setProjectColumns(std::vector<std::string> projectColumns) {
bindData->setProjectColumns(std::move(projectColumns));
}

void setNodeMaskRoots(std::vector<std::shared_ptr<LogicalOperator>> roots) {
Expand Down
23 changes: 12 additions & 11 deletions include/neug/utils/reader/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,12 @@ struct ReadOptions {
* @brief Template base class for building format-specific scan options
*
* This template class provides a generic interface for building scan options
* for different data formats. It handles column projection (skipColumns) and
* row filtering (skipRows) operations. Derived classes implement
* for different data formats. It handles column projection (projectColumns)
* and row filtering (skipRows) operations. Derived classes implement
* format-specific option building logic.
*
* @tparam T The type of options structure returned by build() and modified by
* skipColumns() and skipRows()
* projectColumns() and skipRows()
*/
template <class T>
class OptionsBuilder {
Expand All @@ -158,11 +158,11 @@ class OptionsBuilder {
virtual T build() const = 0;

/**
* @brief Applies column projection to exclude skipped columns
* @brief Applies column projection to include only specified columns
* @param options The options structure to modify
* @return true if column projection was successfully applied, false otherwise
*/
virtual bool skipColumns(T& options) { return false; }
virtual bool projectColumns(T& options) { return false; }

/**
* @brief Applies row filtering based on filter expressions
Expand Down Expand Up @@ -206,8 +206,9 @@ struct ArrowOptions {
* and batch size
* - File format: builds format-specific FileFormat via buildFileFormat()
*
* The skipColumns() method implements column pruning by setting the projection
* expression in scanOptions, excluding columns listed in state.skipColumns.
* The projectColumns() method implements column pruning by setting the
* projection expression in scanOptions to include only columns listed in
* state.projectColumns.
*
* The skipRows() method implements filter pushdown by converting
* common::Expression to arrow::compute::Expression and setting it as the
Expand Down Expand Up @@ -241,15 +242,15 @@ class ArrowOptionsBuilder : public OptionsBuilder<ArrowOptions> {
virtual ArrowOptions build() const override = 0;

/**
* @brief Applies column projection to exclude skipped columns
* @brief Applies column projection to include only specified columns
*
* Modifies the projection expression in options.scanOptions to exclude
* columns listed in state.skipColumns, implementing column pruning.
* Modifies the projection expression in options.scanOptions to include
* only columns listed in state.projectColumns, implementing column pruning.
*
* @param options The ArrowOptions to modify
* @return true if column projection was successfully applied, false otherwise
*/
virtual bool skipColumns(ArrowOptions& options) override;
virtual bool projectColumns(ArrowOptions& options) override;

/**
* @brief Applies row filtering based on filter expressions
Expand Down
18 changes: 5 additions & 13 deletions include/neug/utils/reader/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ struct ReadLocalState {
* operations. It includes:
* - Schema information: external table metadata (column names, types, file
* info)
* - Column pruning: list of columns to skip during projection
* - Column projection: list of columns to include in output
* - Filter pushdown: predicate expression for row filtering
*
* The columnNum() method calculates the effective number of columns after
Expand All @@ -98,27 +98,19 @@ struct ReadLocalState {
*/
struct ReadSharedState {
ExternalSchema schema;
std::vector<std::string> skipColumns;
std::vector<std::string> projectColumns;
std::shared_ptr<::common::Expression> skipRows;

/**
* @brief Get the number of columns after column pruning
* @return The number of columns remaining (not skipped) according to
* skipColumns
* @brief Get the number of columns after column projection
* @return The number of projected columns, or all columns if no projection
*/
int columnNum() {
if (!schema.entry) {
return 0;
}
const auto& allColumns = schema.entry->columnNames;
int total = static_cast<int>(allColumns.size());
for (const auto& column : allColumns) {
if (std::find(skipColumns.begin(), skipColumns.end(), column) !=
skipColumns.end()) {
--total;
}
}
return total;
return projectColumns.empty() ? allColumns.size() : projectColumns.size();
}
};

Expand Down
2 changes: 1 addition & 1 deletion proto/cypher_dml.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ message DataSource {
string extension_name = 1;
EntrySchema entry_schema = 4;
FileSchema file_schema = 5;
repeated string skip_columns = 6;
repeated string project_columns = 6;
common.Expression skip_rows = 7;
Comment thread
qodo-code-review[bot] marked this conversation as resolved.
}

Expand Down
6 changes: 1 addition & 5 deletions src/compiler/binder/bind/read/bind_load_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "neug/compiler/binder/expression/expression_util.h"
#include "neug/compiler/binder/query/reading_clause/bound_load_from.h"
#include "neug/compiler/common/types/value/value.h"
#include "neug/compiler/function/table/scan_file_function.h"
#include "neug/compiler/parser/query/reading_clause/load_from.h"
#include "neug/compiler/parser/scan_source.h"
#include "neug/utils/exception/exception.h"
Expand Down Expand Up @@ -56,11 +57,6 @@ std::unique_ptr<BoundReadingClause> Binder::bindLoadFrom(
auto boundScanSource = bindFileScanSource(
*source, loadFrom.getParsingOptions(), columnNames, columnTypes);
auto& scanInfo = boundScanSource->constCast<BoundTableScanSource>().info;
auto& bindData = scanInfo.bindData->cast<function::ScanFileBindData>();
auto& fileInfo = bindData.fileScanInfo;
// We support LOAD FROM by ArrowArrayContextColumn with BATCH_READ set to
// false.
fileInfo.options.insert({"BATCH_READ", common::Value::createValue(false)});
boundLoadFrom = std::make_unique<BoundLoadFrom>(scanInfo.copy());
} break;
Comment on lines 57 to 61
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

3. Load from batch_read regression 🐞 Bug ☼ Reliability

bindLoadFrom no longer forces batch_read/BATCH_READ=false for file sources, and the new safety is
implemented only in an optimizer pass gated by enablePlanOptimizer; with optimizer disabled,
batch_read defaults to true and LOAD FROM can produce ArrowStreamContextColumn in unsupported
pipelines.
Agent Prompt
### Issue description
The previous bind-time safety that forced `BATCH_READ/batch_read=false` for `LOAD FROM` file scans has been removed. The replacement guard lives in an optimizer pass that does not run when `enablePlanOptimizer=false`, allowing `batch_read` to fall back to its default (`true`) and produce `ArrowStreamContextColumn`.

### Issue Context
- `Optimizer::optimize` is gated by `enablePlanOptimizer`.
- `ReadOptions.batch_read` defaults to `true`.
- `ArrowReader::batch_read` emits `ArrowStreamContextColumn`, which is not generally supported.

### Fix Focus Areas
- src/compiler/binder/bind/read/bind_load_from.cpp[46-61]
- src/compiler/optimizer/optimizer.cpp[55-82]
- include/neug/utils/reader/options.h[122-126]

### What to change
- Reintroduce a bind-time override for `LOAD FROM` scans to force batch read off (e.g., set `BATCH_READ=false` / `batch_read=false` in the scan options map) so correctness does not depend on optimizer execution.
- Keep the optimizer-based guard as an additional safety net, but don’t rely on it for baseline correctness.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

default:
Expand Down
12 changes: 0 additions & 12 deletions src/compiler/function/table/bind_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,6 @@
namespace neug {
namespace function {

std::vector<bool> TableFuncBindData::getColumnSkips() const {
if (columnSkips
.empty()) { // If not specified, all columns should be scanned.
std::vector<bool> skips;
for (auto i = 0u; i < getNumColumns(); ++i) {
skips.push_back(false);
}
return skips;
}
return columnSkips;
}

bool TableFuncBindData::getIgnoreErrorsOption() const {
return common::CopyConstants::DEFAULT_IGNORE_ERRORS;
}
Expand Down
13 changes: 4 additions & 9 deletions src/compiler/gopt/g_query_converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1173,15 +1173,10 @@ void GQueryConvertor::convertDataSource(
sourcePB->set_allocated_skip_rows(
exprConvertor->convert(*rowSkips, {}).release());
}
auto columnSkips = scanBindData->getColumnSkips();
if (scanBindData->columns.size() != columnSkips.size()) {
THROW_EXCEPTION_WITH_FILE_LINE("Each column should have a skip flag");
}
for (auto idx = 0; idx < scanBindData->columns.size(); idx++) {
if (columnSkips[idx]) {
sourcePB->mutable_skip_columns()->Add(
scanBindData->columns[idx]->rawName());
}
// Proto field is named skip_columns for historical reasons; it now carries
// the list of columns to project (include), not skip.
for (const auto& column : scanBindData->getProjectColumns()) {
sourcePB->add_project_columns(column);
}

auto physicalPB = std::make_unique<::physical::PhysicalOpr>();
Expand Down
3 changes: 2 additions & 1 deletion src/compiler/optimizer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ add_library(neug_optimizer
union_alias_map_optimizer.cpp
remove_subquery_as_join.cpp
common_pattern_reuse_optimizer.cpp
project_join_condition_optimizer.cpp)
project_join_condition_optimizer.cpp
project_into_data_source_optimizer.cpp)
add_dependencies(neug_optimizer neug_plan_proto)

set(ALL_OBJECT_FILES
Expand Down
4 changes: 4 additions & 0 deletions src/compiler/optimizer/optimizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "neug/compiler/optimizer/filter_push_down_pattern.h"
#include "neug/compiler/optimizer/flat_join_to_expand_optimizer.h"
#include "neug/compiler/optimizer/limit_push_down_optimizer.h"
#include "neug/compiler/optimizer/project_into_data_source_optimizer.h"
#include "neug/compiler/optimizer/project_join_condition_optimizer.h"
#include "neug/compiler/optimizer/projection_push_down_optimizer.h"
#include "neug/compiler/optimizer/remove_factorization_rewriter.h"
Expand Down Expand Up @@ -74,6 +75,9 @@ void Optimizer::optimize(
context->getClientConfig()->recursivePatternSemantic, context);
projectionPushDownOptimizer.rewrite(plan);

auto projectIntoDataSourceOptimizer = ProjectIntoDataSourceOptimizer();
projectIntoDataSourceOptimizer.rewrite(plan);

auto limitPushDownOptimizer = LimitPushDownOptimizer();
limitPushDownOptimizer.rewrite(plan);

Expand Down
Loading
Loading