Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions be/src/core/column/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -563,11 +563,27 @@ class IColumn : public COW<IColumn> {

/// If the column contains subcolumns (such as Array, Nullable, etc), do callback on them.
/// Shallow: doesn't do recursive calls; don't do call for itself.
using MutableColumnCallback = std::function<void(WrappedPtr&)>;
using ColumnCallback = std::function<void(const IColumn&)>;
virtual void for_each_subcolumn(MutableColumnCallback) {}
virtual void for_each_subcolumn(ColumnCallback) const {}

protected:
virtual void mutate_subcolumns() {}

static void mutate_subcolumn(WrappedPtr& subcolumn) {
static_cast<IColumn::Ptr&>(subcolumn) =
std::move(*static_cast<const IColumn::Ptr&>(subcolumn)).mutate();
}

template <typename ColumnType>
static void mutate_subcolumn(typename ColumnType::WrappedPtr& subcolumn) {
auto mutated = std::move(*static_cast<const typename ColumnType::Ptr&>(subcolumn)).mutate();
auto typed_mutated = ColumnType::cast_to_column_mutptr(
assert_cast<ColumnType*, TypeCheckOnRelease::DISABLE>(mutated.get()));
mutated = nullptr;
static_cast<typename ColumnType::Ptr&>(subcolumn) = std::move(typed_mutated);
}

public:
/// Columns have equal structure.
/// If true - you can use "compare_at", "insert_from", etc. methods.
virtual bool structure_equals(const IColumn&) const {
Expand All @@ -581,10 +597,7 @@ class IColumn : public COW<IColumn> {
// exclusive nodes are reused through the COW fast path.
MutablePtr mutate() const&& {
MutablePtr res = shallow_mutate();
res->for_each_subcolumn([](WrappedPtr& subcolumn) {
static_cast<IColumn::Ptr&>(subcolumn) =
std::move(*static_cast<const IColumn::Ptr&>(subcolumn)).mutate();
});
res->mutate_subcolumns();
return res;
}

Expand All @@ -595,10 +608,7 @@ class IColumn : public COW<IColumn> {
static MutablePtr mutate(Ptr ptr) {
MutablePtr res = ptr->shallow_mutate(); /// Now use_count is 2.
ptr.reset(); /// Reset use_count to 1.
res->for_each_subcolumn([](WrappedPtr& subcolumn) {
static_cast<IColumn::Ptr&>(subcolumn) =
std::move(*static_cast<const IColumn::Ptr&>(subcolumn)).mutate();
});
res->mutate_subcolumns();
return res;
}

Expand Down
12 changes: 3 additions & 9 deletions be/src/core/column/column_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
#include "core/field.h"
#include "core/string_ref.h"
#include "core/types.h"
#include "util/defer_op.h"

class SipHash;

Expand Down Expand Up @@ -216,14 +215,9 @@ class ColumnArray final : public COWHelper<IColumn, ColumnArray> {
return get_offsets()[i] - get_offsets()[i - 1];
}

void for_each_subcolumn(MutableColumnCallback callback) override {
IColumn::WrappedPtr offsets_column(std::move(static_cast<ColumnOffsets::Ptr&>(offsets)));
Defer defer([&] {
static_cast<ColumnOffsets::Ptr&>(offsets) =
cast_to_column<ColumnOffsets>(static_cast<const IColumn::Ptr&>(offsets_column));
});
callback(offsets_column);
callback(data);
void mutate_subcolumns() override {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new hook still needs a shared-detach test. The added array COW test builds the array from moved mutable children and then calls IColumn::mutate(std::move(array)), so it only exercises the exclusive fast path; it would not catch a regression where the by-value/shared IColumn::mutate(array) path forgets to detach either the nested data or the typed offsets. Please add a small test analogous to the nullable/map shared tests that keeps aliases to both child columns, calls IColumn::mutate(array) without moving the source owner, and verifies the result detached both children while the aliases remain unchanged. Since this PR also adds mutate_subcolumns() hooks for ColumnConst and ColumnStruct, small direct mutate tests for those hooks would keep the COW refactor covered across all changed composite columns.

mutate_subcolumn<ColumnOffsets>(offsets);
mutate_subcolumn(data);
}

void for_each_subcolumn(ColumnCallback callback) const override {
Expand Down
2 changes: 1 addition & 1 deletion be/src/core/column/column_const.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ class ColumnConst final : public COWHelper<IColumn, ColumnConst> {
}
}

void for_each_subcolumn(MutableColumnCallback callback) override { callback(data); }
void mutate_subcolumns() override { mutate_subcolumn(data); }

void for_each_subcolumn(ColumnCallback callback) const override {
callback(*static_cast<const IColumn::Ptr&>(data));
Expand Down
14 changes: 4 additions & 10 deletions be/src/core/column/column_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
#include "core/string_ref.h"
#include "core/types.h"
#include "exec/common/sip_hash.h"
#include "util/defer_op.h"

class SipHash;

Expand Down Expand Up @@ -74,15 +73,10 @@ class ColumnMap final : public COWHelper<IColumn, ColumnMap> {

std::string get_name() const override;

void for_each_subcolumn(MutableColumnCallback callback) override {
IColumn::WrappedPtr offsets(std::move(static_cast<COffsets::Ptr&>(offsets_column)));
Defer defer([&] {
static_cast<COffsets::Ptr&>(offsets_column) =
cast_to_column<COffsets>(static_cast<const IColumn::Ptr&>(offsets));
});
callback(keys_column);
callback(values_column);
callback(offsets);
void mutate_subcolumns() override {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This COW refactor changes ColumnMap from the old callback/defer path to a class-specific mutate_subcolumns() that must enumerate all three children (keys_column, values_column, and typed offsets_column). The new tests cover array and nullable, but there is no focused test that mutates a shared/exclusive ColumnMap and verifies all three child pointers are detached or preserved correctly. A missed or wrong entry in this method would compile and leave one map child aliased across mutation, so please add a BE unit test analogous to the new array/nullable cases for ColumnMap.

mutate_subcolumn(keys_column);
mutate_subcolumn(values_column);
mutate_subcolumn<COffsets>(offsets_column);
}

void for_each_subcolumn(ColumnCallback callback) const override {
Expand Down
12 changes: 3 additions & 9 deletions be/src/core/column/column_nullable.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
#include "core/typeid_cast.h"
#include "core/types.h"
#include "storage/olap_common.h"
#include "util/defer_op.h"

class SipHash;

Expand Down Expand Up @@ -250,14 +249,9 @@ class ColumnNullable final : public COWHelper<IColumn, ColumnNullable> {
return get_ptr();
}

void for_each_subcolumn(MutableColumnCallback callback) override {
callback(_nested_column);

IColumn::WrappedPtr null_map(std::move(static_cast<ColumnUInt8::Ptr&>(_null_map)));
Defer defer([&] {
_null_map = cast_to_column<ColumnUInt8>(static_cast<const IColumn::Ptr&>(null_map));
});
callback(null_map);
void mutate_subcolumns() override {
mutate_subcolumn(_nested_column);
mutate_subcolumn<ColumnUInt8>(_null_map);
}

void for_each_subcolumn(ColumnCallback callback) const override {
Expand Down
4 changes: 2 additions & 2 deletions be/src/core/column/column_struct.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,9 @@ bool ColumnStruct::has_enough_capacity(const IColumn& src) const {
return true;
}

void ColumnStruct::for_each_subcolumn(MutableColumnCallback callback) {
void ColumnStruct::mutate_subcolumns() {
for (auto& column : columns) {
callback(column);
mutate_subcolumn(column);
}
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/core/column/column_struct.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class ColumnStruct final : public COWHelper<IColumn, ColumnStruct> {
size_t byte_size() const override;
size_t allocated_bytes() const override;
bool has_enough_capacity(const IColumn& src) const override;
void for_each_subcolumn(MutableColumnCallback callback) override;
void mutate_subcolumns() override;
void for_each_subcolumn(ColumnCallback callback) const override;
bool structure_equals(const IColumn& rhs) const override;

Expand Down
20 changes: 12 additions & 8 deletions be/src/core/column/column_variant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
#include "exprs/aggregate/aggregate_function.h"
#include "exprs/json_functions.h"
#include "storage/olap_common.h"
#include "util/defer_op.h"
#include "util/json/path_in_data.h"
#include "util/jsonb_document.h"
#include "util/jsonb_document_cast.h"
Expand Down Expand Up @@ -826,15 +825,14 @@ size_t ColumnVariant::allocated_bytes() const {
return res;
}

void ColumnVariant::for_each_subcolumn(MutableColumnCallback callback) {
void ColumnVariant::mutate_subcolumns() {
for (auto& entry : subcolumns) {
for (auto& part : entry->data.data) {
callback(part);
mutate_subcolumn(part);
}
}
callback(serialized_sparse_column);
callback(serialized_doc_value_column);
// callback may be filter, so the row count may be changed
mutate_subcolumn(serialized_sparse_column);
mutate_subcolumn(serialized_doc_value_column);
num_rows = serialized_sparse_column->size();
ENABLE_CHECK_CONSISTENCY(this);
}
Expand Down Expand Up @@ -2385,7 +2383,7 @@ size_t ColumnVariant::filter(const Filter& filter) {
for (auto& subcolumn : subcolumns) {
subcolumn->data.num_rows = count;
}
MutableColumnCallback callback = [&](IColumn::WrappedPtr& part) {
auto filter_part = [&](IColumn::WrappedPtr& part) {
if (part->size() != count) {
if (part->is_exclusive()) {
const auto result_size = part->filter(filter);
Expand All @@ -2401,7 +2399,13 @@ size_t ColumnVariant::filter(const Filter& filter) {
}
}
};
for_each_subcolumn(callback);
for (auto& entry : subcolumns) {
for (auto& part : entry->data.data) {
filter_part(part);
}
}
filter_part(serialized_sparse_column);
filter_part(serialized_doc_value_column);
}
num_rows = count;
ENABLE_CHECK_CONSISTENCY(this);
Expand Down
2 changes: 1 addition & 1 deletion be/src/core/column/column_variant.h
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ class ColumnVariant final : public COWHelper<IColumn, ColumnVariant> {

bool has_enough_capacity(const IColumn& src) const override { return false; }

void for_each_subcolumn(MutableColumnCallback callback) override;
void mutate_subcolumns() override;
void for_each_subcolumn(ColumnCallback callback) const override;

// Do nothing, call try_insert instead
Expand Down
42 changes: 42 additions & 0 deletions be/test/core/column/column_array_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
// specific language governing permissions and limitations
// under the License.

#include "core/column/column_array.h"

#include <gtest/gtest-message.h>
#include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>

#include "core/column/column.h"
#include "core/column/column_vector.h"
#include "core/column/common_column_test.h"
#include "core/types.h"
#include "exprs/function/function_test_util.h"
Expand All @@ -28,6 +31,45 @@
// for example column_ip should test these functions

namespace doris {

TEST(ColumnArrayCowTest, MutateDetachesSharedSubcolumns) {
auto data_mut = ColumnInt64::create();
data_mut->insert_value(10);
ColumnPtr data = std::move(data_mut);
ColumnPtr data_alias = data;

auto offsets_mut = ColumnArray::ColumnOffsets::create();
offsets_mut->insert_value(1);
ColumnPtr offsets = std::move(offsets_mut);
ColumnPtr offsets_alias = offsets;

ColumnPtr array = ColumnArray::create(data, offsets);
auto mutated = IColumn::mutate(array);
const auto& mutated_array = assert_cast<const ColumnArray&>(*mutated);

EXPECT_NE(mutated_array.get_data_ptr().get(), data_alias.get());
EXPECT_NE(mutated_array.get_offsets_ptr().get(), offsets_alias.get());
}

TEST(ColumnArrayCowTest, MutateKeepsExclusiveSubcolumns) {
auto data = ColumnInt64::create();
data->insert_value(10);

auto offsets = ColumnArray::ColumnOffsets::create();
offsets->insert_value(1);

ColumnPtr array = ColumnArray::create(std::move(data), std::move(offsets));
const auto& array_ref = assert_cast<const ColumnArray&>(*array);
const auto* data_raw = array_ref.get_data_ptr().get();
const auto* offsets_raw = array_ref.get_offsets_ptr().get();

auto mutated = IColumn::mutate(std::move(array));
const auto& mutated_array = assert_cast<const ColumnArray&>(*mutated);

EXPECT_EQ(mutated_array.get_data_ptr().get(), data_raw);
EXPECT_EQ(mutated_array.get_offsets_ptr().get(), offsets_raw);
}

static std::string test_result_dir;
class ColumnArrayTest : public CommonColumnTest {
protected:
Expand Down
27 changes: 27 additions & 0 deletions be/test/core/column/column_const_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,33 @@

namespace doris {

TEST(ColumnConstCowTest, MutateDetachesSharedDataColumn) {
auto data_mut = ColumnInt64::create();
data_mut->insert_value(7);
ColumnPtr data = std::move(data_mut);
ColumnPtr data_alias = data;

ColumnPtr column_const = ColumnConst::create(data, 3);
auto mutated = IColumn::mutate(column_const);
const auto& mutated_const = assert_cast<const ColumnConst&>(*mutated);

EXPECT_NE(mutated_const.get_data_column_ptr().get(), data_alias.get());
EXPECT_EQ(assert_cast<const ColumnInt64&>(*data_alias).get_element(0), 7);
EXPECT_EQ(mutated_const.get_data_column().get_int(0), 7);
}

TEST(ColumnConstCowTest, MutateKeepsExclusiveDataColumn) {
auto data = ColumnInt64::create();
data->insert_value(7);
const auto* data_raw = data.get();

ColumnPtr column_const = ColumnConst::create(std::move(data), 3);
auto mutated = IColumn::mutate(std::move(column_const));
const auto& mutated_const = assert_cast<const ColumnConst&>(*mutated);

EXPECT_EQ(mutated_const.get_data_column_ptr().get(), data_raw);
}

TEST(ColumnConstTest, TestCreate) {
auto column_data = ColumnHelper::create_column<DataTypeInt64>({7});
auto column_const = ColumnConst::create(column_data, 10);
Expand Down
49 changes: 49 additions & 0 deletions be/test/core/column/column_map_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,55 @@

namespace doris {

TEST(ColumnMapCowTest, MutateDetachesSharedSubcolumns) {
auto keys_mut = ColumnInt64::create();
keys_mut->insert_value(10);
ColumnPtr keys = std::move(keys_mut);
ColumnPtr keys_alias = keys;

auto values_mut = ColumnInt64::create();
values_mut->insert_value(100);
ColumnPtr values = std::move(values_mut);
ColumnPtr values_alias = values;

auto offsets_mut = ColumnMap::COffsets::create();
offsets_mut->insert_value(1);
ColumnPtr offsets = std::move(offsets_mut);
ColumnPtr offsets_alias = offsets;

ColumnPtr map = ColumnMap::create(keys, values, offsets);
auto mutated = IColumn::mutate(map);
const auto& mutated_map = assert_cast<const ColumnMap&>(*mutated);

EXPECT_NE(mutated_map.get_keys_ptr().get(), keys_alias.get());
EXPECT_NE(mutated_map.get_values_ptr().get(), values_alias.get());
EXPECT_NE(mutated_map.get_offsets_ptr().get(), offsets_alias.get());
}

TEST(ColumnMapCowTest, MutateKeepsExclusiveSubcolumns) {
auto keys = ColumnInt64::create();
keys->insert_value(10);

auto values = ColumnInt64::create();
values->insert_value(100);

auto offsets = ColumnMap::COffsets::create();
offsets->insert_value(1);

ColumnPtr map = ColumnMap::create(std::move(keys), std::move(values), std::move(offsets));
const auto& map_ref = assert_cast<const ColumnMap&>(*map);
const auto* keys_raw = map_ref.get_keys_ptr().get();
const auto* values_raw = map_ref.get_values_ptr().get();
const auto* offsets_raw = map_ref.get_offsets_ptr().get();

auto mutated = IColumn::mutate(std::move(map));
const auto& mutated_map = assert_cast<const ColumnMap&>(*mutated);

EXPECT_EQ(mutated_map.get_keys_ptr().get(), keys_raw);
EXPECT_EQ(mutated_map.get_values_ptr().get(), values_raw);
EXPECT_EQ(mutated_map.get_offsets_ptr().get(), offsets_raw);
}

class ColumnMapTest : public ::testing::Test {
protected:
void SetUp() override {}
Expand Down
Loading
Loading