Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions be/src/core/column/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -563,11 +563,27 @@ class IColumn : public COW<IColumn> {

/// If the column contains subcolumns (such as Array, Nullable, etc), do callback on them.
/// Shallow: doesn't do recursive calls; don't do call for itself.
using MutableColumnCallback = std::function<void(WrappedPtr&)>;
using ColumnCallback = std::function<void(const IColumn&)>;
virtual void for_each_subcolumn(MutableColumnCallback) {}
virtual void for_each_subcolumn(ColumnCallback) const {}

protected:
virtual void mutate_subcolumns() {}

static void mutate_subcolumn(WrappedPtr& subcolumn) {
static_cast<IColumn::Ptr&>(subcolumn) =
std::move(*static_cast<const IColumn::Ptr&>(subcolumn)).mutate();
}

template <typename ColumnType>
static void mutate_subcolumn(typename ColumnType::WrappedPtr& subcolumn) {
auto mutated = std::move(*static_cast<const typename ColumnType::Ptr&>(subcolumn)).mutate();
auto typed_mutated = ColumnType::cast_to_column_mutptr(
assert_cast<ColumnType*, TypeCheckOnRelease::DISABLE>(mutated.get()));
mutated = nullptr;
static_cast<typename ColumnType::Ptr&>(subcolumn) = std::move(typed_mutated);
}

public:
/// Columns have equal structure.
/// If true - you can use "compare_at", "insert_from", etc. methods.
virtual bool structure_equals(const IColumn&) const {
Expand All @@ -581,10 +597,7 @@ class IColumn : public COW<IColumn> {
// exclusive nodes are reused through the COW fast path.
MutablePtr mutate() const&& {
MutablePtr res = shallow_mutate();
res->for_each_subcolumn([](WrappedPtr& subcolumn) {
static_cast<IColumn::Ptr&>(subcolumn) =
std::move(*static_cast<const IColumn::Ptr&>(subcolumn)).mutate();
});
res->mutate_subcolumns();
return res;
}

Expand All @@ -595,10 +608,7 @@ class IColumn : public COW<IColumn> {
static MutablePtr mutate(Ptr ptr) {
MutablePtr res = ptr->shallow_mutate(); /// Now use_count is 2.
ptr.reset(); /// Reset use_count to 1.
res->for_each_subcolumn([](WrappedPtr& subcolumn) {
static_cast<IColumn::Ptr&>(subcolumn) =
std::move(*static_cast<const IColumn::Ptr&>(subcolumn)).mutate();
});
res->mutate_subcolumns();
return res;
}

Expand Down
12 changes: 3 additions & 9 deletions be/src/core/column/column_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
#include "core/field.h"
#include "core/string_ref.h"
#include "core/types.h"
#include "util/defer_op.h"

class SipHash;

Expand Down Expand Up @@ -216,14 +215,9 @@ class ColumnArray final : public COWHelper<IColumn, ColumnArray> {
return get_offsets()[i] - get_offsets()[i - 1];
}

void for_each_subcolumn(MutableColumnCallback callback) override {
IColumn::WrappedPtr offsets_column(std::move(static_cast<ColumnOffsets::Ptr&>(offsets)));
Defer defer([&] {
static_cast<ColumnOffsets::Ptr&>(offsets) =
cast_to_column<ColumnOffsets>(static_cast<const IColumn::Ptr&>(offsets_column));
});
callback(offsets_column);
callback(data);
void mutate_subcolumns() override {
mutate_subcolumn<ColumnOffsets>(offsets);
mutate_subcolumn(data);
}

void for_each_subcolumn(ColumnCallback callback) const override {
Expand Down
2 changes: 1 addition & 1 deletion be/src/core/column/column_const.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ class ColumnConst final : public COWHelper<IColumn, ColumnConst> {
}
}

void for_each_subcolumn(MutableColumnCallback callback) override { callback(data); }
void mutate_subcolumns() override { mutate_subcolumn(data); }

void for_each_subcolumn(ColumnCallback callback) const override {
callback(*static_cast<const IColumn::Ptr&>(data));
Expand Down
14 changes: 4 additions & 10 deletions be/src/core/column/column_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
#include "core/string_ref.h"
#include "core/types.h"
#include "exec/common/sip_hash.h"
#include "util/defer_op.h"

class SipHash;

Expand Down Expand Up @@ -74,15 +73,10 @@ class ColumnMap final : public COWHelper<IColumn, ColumnMap> {

std::string get_name() const override;

void for_each_subcolumn(MutableColumnCallback callback) override {
IColumn::WrappedPtr offsets(std::move(static_cast<COffsets::Ptr&>(offsets_column)));
Defer defer([&] {
static_cast<COffsets::Ptr&>(offsets_column) =
cast_to_column<COffsets>(static_cast<const IColumn::Ptr&>(offsets));
});
callback(keys_column);
callback(values_column);
callback(offsets);
void mutate_subcolumns() override {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This COW refactor changes ColumnMap from the old callback/defer path to a class-specific mutate_subcolumns() that must enumerate all three children (keys_column, values_column, and typed offsets_column). The new tests cover array and nullable, but there is no focused test that mutates a shared/exclusive ColumnMap and verifies all three child pointers are detached or preserved correctly. A missed or wrong entry in this method would compile and leave one map child aliased across mutation, so please add a BE unit test analogous to the new array/nullable cases for ColumnMap.

mutate_subcolumn(keys_column);
mutate_subcolumn(values_column);
mutate_subcolumn<COffsets>(offsets_column);
}

void for_each_subcolumn(ColumnCallback callback) const override {
Expand Down
12 changes: 3 additions & 9 deletions be/src/core/column/column_nullable.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
#include "core/typeid_cast.h"
#include "core/types.h"
#include "storage/olap_common.h"
#include "util/defer_op.h"

class SipHash;

Expand Down Expand Up @@ -250,14 +249,9 @@ class ColumnNullable final : public COWHelper<IColumn, ColumnNullable> {
return get_ptr();
}

void for_each_subcolumn(MutableColumnCallback callback) override {
callback(_nested_column);

IColumn::WrappedPtr null_map(std::move(static_cast<ColumnUInt8::Ptr&>(_null_map)));
Defer defer([&] {
_null_map = cast_to_column<ColumnUInt8>(static_cast<const IColumn::Ptr&>(null_map));
});
callback(null_map);
void mutate_subcolumns() override {
mutate_subcolumn(_nested_column);
mutate_subcolumn<ColumnUInt8>(_null_map);
}

void for_each_subcolumn(ColumnCallback callback) const override {
Expand Down
4 changes: 2 additions & 2 deletions be/src/core/column/column_struct.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,9 @@ bool ColumnStruct::has_enough_capacity(const IColumn& src) const {
return true;
}

void ColumnStruct::for_each_subcolumn(MutableColumnCallback callback) {
void ColumnStruct::mutate_subcolumns() {
for (auto& column : columns) {
callback(column);
mutate_subcolumn(column);
}
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/core/column/column_struct.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class ColumnStruct final : public COWHelper<IColumn, ColumnStruct> {
size_t byte_size() const override;
size_t allocated_bytes() const override;
bool has_enough_capacity(const IColumn& src) const override;
void for_each_subcolumn(MutableColumnCallback callback) override;
void mutate_subcolumns() override;
void for_each_subcolumn(ColumnCallback callback) const override;
bool structure_equals(const IColumn& rhs) const override;

Expand Down
20 changes: 12 additions & 8 deletions be/src/core/column/column_variant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
#include "exprs/aggregate/aggregate_function.h"
#include "exprs/json_functions.h"
#include "storage/olap_common.h"
#include "util/defer_op.h"
#include "util/json/path_in_data.h"
#include "util/jsonb_document.h"
#include "util/jsonb_document_cast.h"
Expand Down Expand Up @@ -826,15 +825,14 @@ size_t ColumnVariant::allocated_bytes() const {
return res;
}

void ColumnVariant::for_each_subcolumn(MutableColumnCallback callback) {
void ColumnVariant::mutate_subcolumns() {
for (auto& entry : subcolumns) {
for (auto& part : entry->data.data) {
callback(part);
mutate_subcolumn(part);
}
}
callback(serialized_sparse_column);
callback(serialized_doc_value_column);
// callback may be filter, so the row count may be changed
mutate_subcolumn(serialized_sparse_column);
mutate_subcolumn(serialized_doc_value_column);
num_rows = serialized_sparse_column->size();
ENABLE_CHECK_CONSISTENCY(this);
}
Expand Down Expand Up @@ -2385,7 +2383,7 @@ size_t ColumnVariant::filter(const Filter& filter) {
for (auto& subcolumn : subcolumns) {
subcolumn->data.num_rows = count;
}
MutableColumnCallback callback = [&](IColumn::WrappedPtr& part) {
auto filter_part = [&](IColumn::WrappedPtr& part) {
if (part->size() != count) {
if (part->is_exclusive()) {
const auto result_size = part->filter(filter);
Expand All @@ -2401,7 +2399,13 @@ size_t ColumnVariant::filter(const Filter& filter) {
}
}
};
for_each_subcolumn(callback);
for (auto& entry : subcolumns) {
for (auto& part : entry->data.data) {
filter_part(part);
}
}
filter_part(serialized_sparse_column);
filter_part(serialized_doc_value_column);
}
num_rows = count;
ENABLE_CHECK_CONSISTENCY(this);
Expand Down
2 changes: 1 addition & 1 deletion be/src/core/column/column_variant.h
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ class ColumnVariant final : public COWHelper<IColumn, ColumnVariant> {

bool has_enough_capacity(const IColumn& src) const override { return false; }

void for_each_subcolumn(MutableColumnCallback callback) override;
void mutate_subcolumns() override;
void for_each_subcolumn(ColumnCallback callback) const override;

// Do nothing, call try_insert instead
Expand Down
23 changes: 23 additions & 0 deletions be/test/core/column/column_array_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
// specific language governing permissions and limitations
// under the License.

#include "core/column/column_array.h"

#include <gtest/gtest-message.h>
#include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>

#include "core/column/column.h"
#include "core/column/column_vector.h"
#include "core/column/common_column_test.h"
#include "core/types.h"
#include "exprs/function/function_test_util.h"
Expand All @@ -28,6 +31,26 @@
// for example column_ip should test these functions

namespace doris {

TEST(ColumnArrayCowTest, MutateKeepsExclusiveSubcolumns) {
auto data = ColumnInt64::create();
data->insert_value(10);

auto offsets = ColumnArray::ColumnOffsets::create();
offsets->insert_value(1);

ColumnPtr array = ColumnArray::create(std::move(data), std::move(offsets));
const auto& array_ref = assert_cast<const ColumnArray&>(*array);
const auto* data_raw = array_ref.get_data_ptr().get();
const auto* offsets_raw = array_ref.get_offsets_ptr().get();

auto mutated = IColumn::mutate(std::move(array));
const auto& mutated_array = assert_cast<const ColumnArray&>(*mutated);

EXPECT_EQ(mutated_array.get_data_ptr().get(), data_raw);
EXPECT_EQ(mutated_array.get_offsets_ptr().get(), offsets_raw);
}

static std::string test_result_dir;
class ColumnArrayTest : public CommonColumnTest {
protected:
Expand Down
43 changes: 43 additions & 0 deletions be/test/core/column/column_nullable_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,49 @@ TEST(ColumnNullableTest, SharedCreatePreservesImmutableSubcolumns) {
EXPECT_EQ(null_map_alias->size(), 1);
}

TEST(ColumnNullableTest, MutateDetachesTypedNullMap) {
auto nested_mut = ColumnInt64::create();
nested_mut->insert_value(10);
ColumnPtr nested = std::move(nested_mut);
ColumnPtr nested_alias = nested;

auto null_map_mut = ColumnUInt8::create();
null_map_mut->insert_value(0);
ColumnPtr null_map = std::move(null_map_mut);
ColumnPtr null_map_alias = null_map;

ColumnPtr nullable = ColumnNullable::create(nested, null_map);
auto mutated = IColumn::mutate(nullable);
auto& mutated_nullable = assert_cast<ColumnNullable&>(*mutated);

EXPECT_NE(mutated_nullable.get_nested_column_ptr().get(), nested_alias.get());
EXPECT_NE(mutated_nullable.get_null_map_column_ptr().get(), null_map_alias.get());
EXPECT_EQ(mutated_nullable.get_null_map_data()[0], 0);

mutated_nullable.get_null_map_data()[0] = 1;
const auto& original_null_map = assert_cast<const ColumnUInt8&>(*null_map_alias);
EXPECT_EQ(original_null_map.get_data()[0], 0);
}

TEST(ColumnNullableTest, MutateKeepsExclusiveSubcolumns) {
auto nested_mut = ColumnInt64::create();
nested_mut->insert_value(10);

auto null_map_mut = ColumnUInt8::create();
null_map_mut->insert_value(0);

ColumnPtr nullable = ColumnNullable::create(std::move(nested_mut), std::move(null_map_mut));
const auto& nullable_ref = assert_cast<const ColumnNullable&>(*nullable);
const auto* nested_raw = nullable_ref.get_nested_column_ptr().get();
const auto* null_map_raw = nullable_ref.get_null_map_column_ptr().get();

auto mutated = IColumn::mutate(std::move(nullable));
const auto& mutated_nullable = assert_cast<const ColumnNullable&>(*mutated);

EXPECT_EQ(mutated_nullable.get_nested_column_ptr().get(), nested_raw);
EXPECT_EQ(mutated_nullable.get_null_map_column_ptr().get(), null_map_raw);
}

TEST(ColumnNullableTest, append_data_by_selector) {
auto srt_column = ColumnHelper::create_nullable_column<DataTypeInt64>(
{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
Expand Down
Loading