Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions include/neug/compiler/gopt/g_expr_converter.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ class GExprConverter {
const binder::Expression& expr,
const std::vector<std::string>& schemaAlias);

std::unique_ptr<::common::Expression> convertToListFunc(
const binder::Expression& expr,
const std::vector<std::string>& schemaAlias);

std::unique_ptr<::common::Expression> convertCaseExpression(
const binder::CaseExpression& expr,
const std::vector<std::string>& schemaAlias);
Expand Down
13 changes: 11 additions & 2 deletions include/neug/compiler/gopt/g_scalar_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "neug/compiler/function/schema/vector_node_rel_functions.h"
#include "neug/compiler/function/string/vector_string_functions.h"
#include "neug/compiler/function/struct/vector_struct_functions.h"
#include "neug/utils/exception/exception.h"

namespace neug {
namespace gopt {
Expand All @@ -46,7 +47,8 @@ enum ScalarType {
LABEL,
PATTERN_EXTRACT, // startNode, endNode, nodes, rels
PROPERTIES, // properties(nodes(), 'name')
TO_ARRAY,
TO_LIST, // 不固定长度数组,数组元素类型一致
TO_TUPLE, // tuple类型,元素类型不一致
UPPER,
LOWER,
REVERSE,
Expand Down Expand Up @@ -113,7 +115,14 @@ class GScalarType {
} else if (func.name == function::PropertiesFunction::name) {
return ScalarType::PROPERTIES;
} else if (func.name == function::ListCreationFunction::name) {
return ScalarType::TO_ARRAY;
const auto& type = expr.getDataType();
if (type.getLogicalTypeID() == common::LogicalTypeID::LIST) {
return ScalarType::TO_LIST;
} else if (type.getLogicalTypeID() == common::LogicalTypeID::STRUCT) {
return ScalarType::TO_TUPLE;
}
THROW_EXCEPTION_WITH_FILE_LINE("Invalid data type: " + type.toString() +
" for function: " + func.name);
} else if (func.name == function::UpperFunction::name) {
return ScalarType::UPPER;
} else if (func.name == function::LowerFunction::name) {
Expand Down
6 changes: 6 additions & 0 deletions include/neug/execution/common/types/value.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "neug/common/types.h"
#include "neug/execution/common/types/graph_types.h"
#include "neug/execution/utils/numeric_cast.h"
#include "neug/utils/property/list_view.h"

namespace neug {
class Property;
Expand Down Expand Up @@ -721,5 +722,10 @@ Value performCastToString(const Value& input);

void encode_value(const Value& val, Encoder& encoder);

// Convert a storage-layer ListView into an execution-layer Value::LIST.
// The ListView's underlying buffer must remain valid for the duration of
// this call (the resulting Value owns its data independently).
Value ListViewToValue(const neug::ListView& lv);

} // namespace execution
} // namespace neug
13 changes: 13 additions & 0 deletions include/neug/execution/expression/exprs/struct_expr.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,18 @@ class TupleExpr : public ExprBase {
std::vector<std::unique_ptr<ExprBase>> exprs_;
DataType type_;
};

class ListExpr : public ExprBase {
public:
ListExpr(std::vector<std::unique_ptr<ExprBase>>&& exprs, DataType list_type);
~ListExpr() override = default;
const DataType& type() const override { return type_; }
std::unique_ptr<BindedExprBase> bind(const IStorageInterface* storage,
const ParamsMap& params) const override;

private:
std::vector<std::unique_ptr<ExprBase>> exprs_;
DataType type_;
};
} // namespace execution
} // namespace neug
199 changes: 199 additions & 0 deletions include/neug/utils/property/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "neug/utils/file_utils.h"
#include "neug/utils/likely.h"
#include "neug/utils/mmap_array.h"
#include "neug/utils/property/list_view.h"
#include "neug/utils/property/property.h"
#include "neug/utils/property/types.h"
#include "neug/utils/serialization/out_archive.h"
Expand Down Expand Up @@ -427,6 +428,204 @@ class TypedColumn<std::string_view> : public ColumnBase {

using StringColumn = TypedColumn<std::string_view>;

// ---------------------------------------------------------------------------
// list_storage_item
// ---------------------------------------------------------------------------
// Index entry used by ListColumn. Wider than string_item (which has a 16-bit
// length field) to accommodate large list blobs.
struct list_storage_item {
uint64_t offset; // byte offset in the ListColumn data buffer
uint32_t length; // byte length of the serialized blob
uint32_t padding{0};
};
static_assert(sizeof(list_storage_item) == 16,
"list_storage_item size must be 16 bytes");

// ---------------------------------------------------------------------------
// ListColumn
// ---------------------------------------------------------------------------
// Stores a column of list-typed property values. Each entry is a serialized
// binary blob produced by ListViewBuilder::finish_pod<T>() or
// ListViewBuilder::finish_varlen().
//
// Storage layout on disk (prefix = column name):
// <prefix>.items -- mmap_array<list_storage_item>: offset+length per entry
// <prefix>.data -- mmap_array<char>: packed blob storage
// <prefix>.pos -- uint64_t: committed write frontier in data buffer
//
// Reading:
// ListView lv = col.get_view(idx);
// // access via lv.GetElem<T>() / lv.GetChildStringView() etc.
//
// Writing:
// ListViewBuilder b;
// b.append_pod(val); // or b.append_blob(sv);
// Property p = Property::from_list_data(b.finish_pod<int32_t>());
// col.set_any(idx, p, /*insert_safe=*/true);
class ListColumn : public ColumnBase {
public:
explicit ListColumn(const DataType& list_type)
: list_type_(list_type), size_(0), pos_(0) {}
~ListColumn() override { close(); }

void open(const std::string& name, const std::string& snapshot_dir,
const std::string& work_dir) override {
std::string basic = snapshot_dir + "/" + name;
if (std::filesystem::exists(basic + ".items")) {
items_.open(basic + ".items", false, false);
data_.open(basic + ".data", false, false);
size_ = items_.size();
init_pos(basic + ".pos");
} else if (!work_dir.empty()) {
std::string work = work_dir + "/" + name;
items_.open(work + ".items", true);
data_.open(work + ".data", true);
size_ = items_.size();
init_pos(work + ".pos");
} else {
size_ = 0;
pos_.store(0);
}
}

void open_in_memory(const std::string& prefix) override {
if (!prefix.empty()) {
items_.open(prefix + ".items", false);
data_.open(prefix + ".data", false);
size_ = items_.size();
init_pos(prefix + ".pos");
} else {
size_ = 0;
pos_.store(0);
}
}

void open_with_hugepages(const std::string& prefix) override {
if (!prefix.empty()) {
items_.open_with_hugepages(prefix + ".items");
data_.open_with_hugepages(prefix + ".data");
size_ = items_.size();
init_pos(prefix + ".pos");
} else {
size_ = 0;
pos_.store(0);
}
}

void close() override {
items_.reset();
data_.reset();
}

void dump(const std::string& filename) override {
size_t pos_val = pos_.load();
write_file(filename + ".pos", &pos_val, sizeof(pos_val), 1);
items_.dump(filename + ".items");
data_.dump(filename + ".data");
}

size_t size() const override { return size_; }

void resize(size_t size) override {
std::unique_lock<std::shared_mutex> lk(rw_mutex_);
items_.resize(size);
// Keep at least as much data space as already committed.
size_t needed = std::max(data_.size(), pos_.load());
data_.resize(std::max(needed, size * 64)); // 64B heuristic per list
size_ = size;
}

void resize(size_t size, const Property& default_value) override {
if (default_value.type() != DataTypeId::kList &&
default_value.type() != DataTypeId::kEmpty) {
THROW_RUNTIME_ERROR("Default value type does not match list column");
}
resize(size);
// Leave entries zero-initialized (empty lists) for new slots.
}
Comment on lines +538 to +545
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

5. Listcolumn ignores non-empty defaults 🐞 Bug ≡ Correctness

ListColumn::resize(size, default_value) validates the type and then does not apply default_value to
newly-added rows, leaving them empty lists. This violates the ColumnBase contract used elsewhere
(TypedColumn fills new rows with the provided default) and breaks schema DEFAULT list values.
Agent Prompt
### Issue description
`ListColumn::resize(size, default_value)` does not populate new rows with `default_value`, unlike other column implementations.

### Issue Context
Schema default values are used when tables grow (e.g., `EnsureCapacity`). For list columns, non-empty defaults will be silently lost.

### Fix Focus Areas
- include/neug/utils/property/column.h[538-545]
- include/neug/utils/property/column.h[554-575]
- src/storages/graph/vertex_table.cc[185-187]

### Suggested fix
1. Implement default filling for newly-added indices similar to `TypedColumn<T>::resize`:
   - Compute `old_size`, resize storage, then for i in [old_size, size): set items_ to reference the default blob.
2. To avoid duplicating blob bytes N times, you can store the default blob once in `data_` and set all new `items_[i]` to the same `{offset,length}`.
3. If the default blob is empty, leave the zero-initialized entries as-is.
4. Add a unit/integration test that defines a list column with a non-empty DEFAULT and verifies inserted rows (or resized capacity rows) read back that default.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


DataTypeId type() const override { return DataTypeId::kList; }

// Return the full DataType::List(...) of this column.
const DataType& list_type() const { return list_type_; }

// Store a pre-built blob (from ListViewBuilder::finish_*) at index idx.
// The blob bytes are copied into the internal data buffer.
void set_value(size_t idx, std::string_view blob) {
if (idx >= size_) {
THROW_RUNTIME_ERROR("Index out of range in ListColumn::set_value");
}
size_t offset = pos_.fetch_add(blob.size());
if (offset + blob.size() > data_.size()) {
std::unique_lock<std::shared_mutex> lk(rw_mutex_);
if (offset + blob.size() > data_.size()) {
data_.resize(
std::max(data_.size() * 2, offset + blob.size() + blob.size()));
}
}
if (!blob.empty()) {
std::memcpy(data_.data() + offset, blob.data(), blob.size());
}
items_.set(idx, {static_cast<uint64_t>(offset),
static_cast<uint32_t>(blob.size())});
}

void set_any(size_t idx, const Property& value, bool insert_safe) override {
set_value(idx, value.as_list_data());
}

ListView get_view(size_t idx) const {
assert(idx < size_);
const auto& item = items_.get(idx);
return ListView(list_type_,
std::string_view(data_.data() + item.offset, item.length));
}

Property get_prop(size_t idx) const override {
const auto& item = items_.get(idx);
return Property::from_list_data(
std::string_view(data_.data() + item.offset, item.length));
}

void set_prop(size_t idx, const Property& prop) override {
set_value(idx, prop.as_list_data());
}

void ingest(uint32_t idx, OutArchive& arc) override {
std::string_view sv;
arc >> sv;
set_value(idx, sv);
}

void ensure_writable(const std::string& work_dir) override {
items_.ensure_writable(work_dir);
data_.ensure_writable(work_dir);
}

private:
void init_pos(const std::string& pos_path) {
if (std::filesystem::exists(pos_path)) {
size_t v = 0;
read_file(pos_path, &v, sizeof(v), 1);
pos_.store(v);
} else {
size_t total = 0;
for (size_t i = 0; i < items_.size(); ++i) {
const auto& it = items_.get(i);
total = std::max(total, static_cast<size_t>(it.offset) + it.length);
}
pos_.store(total);
}
}

DataType list_type_;
mmap_array<list_storage_item> items_;
mmap_array<char> data_;
size_t size_;
std::atomic<size_t> pos_;
mutable std::shared_mutex rw_mutex_;
};

std::shared_ptr<ColumnBase> CreateColumn(DataType type);

/// Create RefColumn for ease of usage for hqps
Expand Down
Loading