From 0f6fb3ffab5b14156d377a1daf096c7ef3861717 Mon Sep 17 00:00:00 2001 From: exmy Date: Mon, 22 Jun 2026 11:59:24 +0800 Subject: [PATCH 1/6] [CH] Implement map_from_entries function Add ClickHouse backend support for map_from_entries and its LAST_WIN variant, including Substrait function mapping and tests for null and duplicate-key behavior. --- .../clickhouse/CHSparkPlanExecApi.scala | 7 + .../GlutenFunctionValidateSuite.scala | 62 ++++- .../Functions/SparkFunctionMapFromEntries.cpp | 240 ++++++++++++++++++ .../CommonScalarFunctionParser.cpp | 2 + .../expression/ExpressionConverter.scala | 10 +- .../gluten/expression/ExpressionNames.scala | 1 + 6 files changed, 319 insertions(+), 3 deletions(-) create mode 100644 cpp-ch/local-engine/Functions/SparkFunctionMapFromEntries.cpp diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index 832932f0a49..f36722e9cfd 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -403,6 +403,13 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging { original: GetMapValue): ExpressionTransformer = GetMapValueTransformer(substraitExprName, left, right, failOnError = false, original) + /** Transform map_from_entries to Substrait. */ + override def genMapFromEntriesTransformer( + substraitExprName: String, + child: ExpressionTransformer, + expr: Expression): ExpressionTransformer = + GenericExpressionTransformer(substraitExprName, Seq(child), expr) + /** * Generate ShuffleDependency for ColumnarShuffleExchangeExec. * diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala index 07d44cc309e..c5627db4ddb 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala @@ -20,7 +20,7 @@ import org.apache.gluten.backendsapi.clickhouse.CHConfig import org.apache.gluten.config.GlutenConfig import org.apache.gluten.expression.{FlattenedAnd, FlattenedOr} -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.{DataFrame, GlutenTestUtils, Row} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -1072,6 +1072,66 @@ class GlutenFunctionValidateSuite extends GlutenClickHouseWholeStageTransformerS } } + test("Test map_from_entries") { + withSQLConf( + SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> + (ConstantFolding.ruleName + "," + NullPropagation.ruleName)) { + val query = + """ + |select id, map_from_entries(entries) from ( + | select id, + | case + | when id = 0 then array( + | named_struct('key', cast(1 as int), 'value', 'a'), + | named_struct('key', cast(2 as int), 'value', cast(null as string))) + | when id = 1 then cast(array() as array>) + | when id = 2 then cast(null as array>) + | else array( + | cast(null as struct), + | named_struct('key', cast(4 as int), 'value', 'd')) + | end as entries + | from range(4) + |) order by id + |""".stripMargin + runQueryAndCompare(query)(checkGlutenPlan[ProjectExecTransformer]) + + intercept[SparkException] { + sql( + """ + |select map_from_entries(array( + | named_struct('key', cast(null as int), 'value', 'a'))) + |from range(1) + |""".stripMargin).collect() + } + + intercept[SparkException] { + sql( + """ + |select map_from_entries(array( + | named_struct('key', cast(1 as int), 'value', 'a'), + | named_struct('key', cast(1 as int), 'value', 'b'))) + |from range(1) + |""".stripMargin).collect() + } + } + } + + test("Test map_from_entries with LAST_WIN map key policy") { + withSQLConf( + SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> + (ConstantFolding.ruleName + "," + NullPropagation.ruleName), + SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString + ) { + runQueryAndCompare( + """ + |select map_from_entries(array( + | named_struct('key', cast(1 as int), 'value', 'a'), + | named_struct('key', cast(1 as int), 'value', 'b'))) + |from range(1) + |""".stripMargin)(checkGlutenPlan[ProjectExecTransformer]) + } + } + test("Test transform_keys/transform_values") { val sql = """ diff --git a/cpp-ch/local-engine/Functions/SparkFunctionMapFromEntries.cpp b/cpp-ch/local-engine/Functions/SparkFunctionMapFromEntries.cpp new file mode 100644 index 00000000000..d2c04bc164e --- /dev/null +++ b/cpp-ch/local-engine/Functions/SparkFunctionMapFromEntries.cpp @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; + extern const int ILLEGAL_COLUMN; + extern const int ILLEGAL_TYPE_OF_ARGUMENT; +} + +template +class SparkFunctionMapFromEntries : public IFunction +{ +public: + static constexpr auto name = last_win ? "sparkMapFromEntriesLastWin" : "sparkMapFromEntries"; + + static FunctionPtr create(ContextPtr) { return std::make_shared(); } + + String getName() const override { return name; } + + size_t getNumberOfArguments() const override { return 1; } + + bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo &) const override { return true; } + bool useDefaultImplementationForConstants() const override { return true; } + bool useDefaultImplementationForNulls() const override { return false; } + bool useDefaultImplementationForLowCardinalityColumns() const override { return false; } + + DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override + { + const auto * array_type = checkAndGetDataType(removeNullable(arguments[0]).get()); + if (!array_type) + throw Exception( + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, + "Argument for function {} must be Array, but it has type {}", + getName(), + arguments[0]->getName()); + + const auto & entry_type = array_type->getNestedType(); + const auto * tuple_type = checkAndGetDataType(removeNullable(entry_type).get()); + if (!tuple_type || tuple_type->getElements().size() != 2) + throw Exception( + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, + "Argument for function {} must be Array of pair Tuple, but it has nested type {}", + getName(), + entry_type->getName()); + + const auto & elements = tuple_type->getElements(); + auto map_type = std::make_shared(removeNullableOrLowCardinalityNullable(elements[0]), elements[1]); + if (arguments[0]->isNullable() || entry_type->isNullable()) + return makeNullable(map_type); + return map_type; + } + + ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override + { + ColumnPtr holder = arguments[0].column->convertToFullColumnIfConst(); + + const PaddedPODArray * input_null_map = nullptr; + if (const auto * nullable = checkAndGetColumn(holder.get())) + { + input_null_map = &nullable->getNullMapData(); + holder = nullable->getNestedColumnPtr(); + } + + const auto * entries_array = checkAndGetColumn(holder.get()); + if (!entries_array) + throw Exception( + ErrorCodes::ILLEGAL_COLUMN, + "Argument column for function {} must be Array, but it is {}", + getName(), + holder->getName()); + + const auto & entries_offsets = entries_array->getOffsets(); + const IColumn * entries_data = &entries_array->getData(); + const PaddedPODArray * entry_null_map = nullptr; + if (const auto * nullable_entries = checkAndGetColumn(entries_data)) + { + entry_null_map = &nullable_entries->getNullMapData(); + entries_data = &nullable_entries->getNestedColumn(); + } + + const auto * entries_tuple = checkAndGetColumn(entries_data); + if (!entries_tuple || entries_tuple->tupleSize() != 2) + throw Exception( + ErrorCodes::ILLEGAL_COLUMN, + "Nested column for function {} must be Tuple with 2 elements, but it is {}", + getName(), + entries_data->getName()); + + const auto & key_column = entries_tuple->getColumn(0); + const auto & value_column = entries_tuple->getColumn(1); + ColumnPtr key_insert_holder; + const IColumn * key_insert_column = &key_column; + if (const auto * nullable_key_column = checkAndGetColumn(&key_column)) + key_insert_column = &nullable_key_column->getNestedColumn(); + else if (const auto * low_cardinality_key_column = checkAndGetColumn(&key_column); + low_cardinality_key_column && low_cardinality_key_column->nestedIsNullable()) + { + key_insert_holder = low_cardinality_key_column->cloneWithDefaultOnNull(); + key_insert_column = key_insert_holder.get(); + } + + const auto & result_map_type = assert_cast(*removeNullable(result_type)); + auto result_key_column = result_map_type.getKeyType()->createColumn(); + auto result_value_column = result_map_type.getValueType()->createColumn(); + auto result_offsets_column = ColumnArray::ColumnOffsets::create(); + auto & result_offsets = result_offsets_column->getData(); + result_offsets.reserve(input_rows_count); + + ColumnUInt8::MutablePtr result_null_map; + PaddedPODArray * result_null_map_data = nullptr; + if (result_type->isNullable()) + { + result_null_map = ColumnUInt8::create(input_rows_count, 0); + result_null_map_data = &result_null_map->getData(); + } + + size_t previous_entry_offset = 0; + size_t result_offset = 0; + for (size_t row = 0; row < input_rows_count; ++row) + { + const auto current_entry_offset = entries_offsets[row]; + + if (input_null_map && (*input_null_map)[row]) + { + if (result_null_map_data) + (*result_null_map_data)[row] = 1; + result_offsets.push_back(result_offset); + previous_entry_offset = current_entry_offset; + continue; + } + + bool has_null_entry = false; + if (entry_null_map) + { + for (size_t entry = previous_entry_offset; entry < current_entry_offset; ++entry) + { + if ((*entry_null_map)[entry]) + { + has_null_entry = true; + break; + } + } + } + + if (has_null_entry) + { + if (result_null_map_data) + (*result_null_map_data)[row] = 1; + result_offsets.push_back(result_offset); + previous_entry_offset = current_entry_offset; + continue; + } + + std::vector> selected_entries; + selected_entries.reserve(current_entry_offset - previous_entry_offset); + for (size_t entry = previous_entry_offset; entry < current_entry_offset; ++entry) + { + if (key_column.isNullAt(entry)) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot use NULL as map key in function {}", getName()); + + bool has_duplicate_key = false; + for (auto & selected_entry : selected_entries) + { + if (key_column.compareAt(entry, selected_entry.first, key_column, 1) == 0) + { + has_duplicate_key = true; + if constexpr (last_win) + { + selected_entry.second = entry; + break; + } + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Duplicate map key is found in function {}", getName()); + } + } + + if (!has_duplicate_key) + selected_entries.emplace_back(entry, entry); + } + + for (const auto selected_entry : selected_entries) + { + result_key_column->insertFrom(*key_insert_column, selected_entry.first); + result_value_column->insertFrom(value_column, selected_entry.second); + ++result_offset; + } + + result_offsets.push_back(result_offset); + previous_entry_offset = current_entry_offset; + } + + auto nested_column = ColumnArray::create( + ColumnTuple::create(Columns{std::move(result_key_column), std::move(result_value_column)}), std::move(result_offsets_column)); + auto result_column = ColumnMap::create(std::move(nested_column)); + if (result_type->isNullable()) + return ColumnNullable::create(std::move(result_column), std::move(result_null_map)); + return result_column; + } +}; + +REGISTER_FUNCTION(SparkMapFromEntries) +{ + factory.registerFunction>(); + factory.registerFunction>(); +} + +} diff --git a/cpp-ch/local-engine/Parser/scalar_function_parser/CommonScalarFunctionParser.cpp b/cpp-ch/local-engine/Parser/scalar_function_parser/CommonScalarFunctionParser.cpp index c1b7dcdb2eb..ae760543bb8 100644 --- a/cpp-ch/local-engine/Parser/scalar_function_parser/CommonScalarFunctionParser.cpp +++ b/cpp-ch/local-engine/Parser/scalar_function_parser/CommonScalarFunctionParser.cpp @@ -173,6 +173,8 @@ REGISTER_COMMON_SCALAR_FUNCTION_PARSER(GetMapValue, get_map_value, arrayElementO REGISTER_COMMON_SCALAR_FUNCTION_PARSER(MapKeys, map_keys, mapKeys); REGISTER_COMMON_SCALAR_FUNCTION_PARSER(MapValues, map_values, mapValues); REGISTER_COMMON_SCALAR_FUNCTION_PARSER(MapFromArrays, map_from_arrays, mapFromArrays); +REGISTER_COMMON_SCALAR_FUNCTION_PARSER(MapFromEntries, map_from_entries, sparkMapFromEntries); +REGISTER_COMMON_SCALAR_FUNCTION_PARSER(MapFromEntriesLastWin, map_from_entries_last_win, sparkMapFromEntriesLastWin); // json functions REGISTER_COMMON_SCALAR_FUNCTION_PARSER(FlattenJsonStringOnRequired, flattenJSONStringOnRequired, flattenJSONStringOnRequired); diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala index 7969d305025..82c19021afa 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala @@ -336,10 +336,16 @@ object ExpressionConverter extends SQLConfHelper with Logging { replaceWithExpressionTransformer0(m.child, attributeSeq, expressionsMap), m) case m: MapFromEntries => + val mapKeyDedupPolicy = SQLConf.get.getConf(SQLConf.MAP_KEY_DEDUP_POLICY) BackendsApiManager.getSparkPlanExecApiInstance.genMapFromEntriesTransformer( - substraitExprName, + if (mapKeyDedupPolicy.toString == SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { + ExpressionNames.MAP_FROM_ENTRIES_LAST_WIN + } else { + substraitExprName + }, replaceWithExpressionTransformer0(m.child, attributeSeq, expressionsMap), - m) + m + ) case e: Explode => ExplodeTransformer( substraitExprName, diff --git a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala index ca05f0ade1a..dd5c3a188b6 100644 --- a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala +++ b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala @@ -308,6 +308,7 @@ object ExpressionNames { final val TRANSFORM_KEYS = "transform_keys" final val TRANSFORM_VALUES = "transform_values" final val MAP_FROM_ENTRIES = "map_from_entries" + final val MAP_FROM_ENTRIES_LAST_WIN = "map_from_entries_last_win" final val STR_TO_MAP = "str_to_map" final val MAP_FILTER = "map_filter" final val MAP_CONTAINS_KEY = "map_contains_key" From 163cdf3d2d9549eb8347fde1c04cd218ff7b5f42 Mon Sep 17 00:00:00 2001 From: exmy Date: Wed, 24 Jun 2026 17:58:42 +0800 Subject: [PATCH 2/6] [CH] Fix map_from_entries empty array handling --- .../GlutenFunctionValidateSuite.scala | 2 + .../Functions/SparkFunctionMapFromEntries.cpp | 77 +++++++++++++++++-- 2 files changed, 74 insertions(+), 5 deletions(-) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala index c5627db4ddb..70a7fe0563a 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala @@ -1094,6 +1094,8 @@ class GlutenFunctionValidateSuite extends GlutenClickHouseWholeStageTransformerS |) order by id |""".stripMargin runQueryAndCompare(query)(checkGlutenPlan[ProjectExecTransformer]) + runQueryAndCompare("select map_from_entries(array()) from range(1)")( + checkGlutenPlan[ProjectExecTransformer]) intercept[SparkException] { sql( diff --git a/cpp-ch/local-engine/Functions/SparkFunctionMapFromEntries.cpp b/cpp-ch/local-engine/Functions/SparkFunctionMapFromEntries.cpp index d2c04bc164e..5e708ad1cd0 100644 --- a/cpp-ch/local-engine/Functions/SparkFunctionMapFromEntries.cpp +++ b/cpp-ch/local-engine/Functions/SparkFunctionMapFromEntries.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -69,7 +70,18 @@ class SparkFunctionMapFromEntries : public IFunction arguments[0]->getName()); const auto & entry_type = array_type->getNestedType(); - const auto * tuple_type = checkAndGetDataType(removeNullable(entry_type).get()); + const auto entry_type_without_nullable = removeNullable(entry_type); + if (isNothing(entry_type_without_nullable)) + { + auto map_type = std::make_shared( + std::make_shared(), + std::make_shared()); + if (arguments[0]->isNullable() || entry_type->isNullable()) + return makeNullable(map_type); + return map_type; + } + + const auto * tuple_type = checkAndGetDataType(entry_type_without_nullable.get()); if (!tuple_type || tuple_type->getElements().size() != 2) throw Exception( ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, @@ -84,7 +96,10 @@ class SparkFunctionMapFromEntries : public IFunction return map_type; } - ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override + ColumnPtr executeImpl( + const ColumnsWithTypeAndName & arguments, + const DataTypePtr & result_type, + size_t input_rows_count) const override { ColumnPtr holder = arguments[0].column->convertToFullColumnIfConst(); @@ -112,6 +127,55 @@ class SparkFunctionMapFromEntries : public IFunction entries_data = &nullable_entries->getNestedColumn(); } + const auto & result_map_type = assert_cast(*removeNullable(result_type)); + if (isNothing(entries_data->getDataType())) + { + auto result_key_column = result_map_type.getKeyType()->createColumn(); + auto result_value_column = result_map_type.getValueType()->createColumn(); + auto result_offsets_column = ColumnArray::ColumnOffsets::create(input_rows_count, 0); + + ColumnUInt8::MutablePtr result_null_map; + PaddedPODArray * result_null_map_data = nullptr; + if (result_type->isNullable()) + { + result_null_map = ColumnUInt8::create(input_rows_count, 0); + result_null_map_data = &result_null_map->getData(); + } + + size_t previous_entry_offset = 0; + for (size_t row = 0; row < input_rows_count; ++row) + { + const auto current_entry_offset = entries_offsets[row]; + if (input_null_map && (*input_null_map)[row]) + { + if (result_null_map_data) + (*result_null_map_data)[row] = 1; + } + else if (entry_null_map) + { + for (size_t entry = previous_entry_offset; entry < current_entry_offset; ++entry) + { + if ((*entry_null_map)[entry]) + { + if (result_null_map_data) + (*result_null_map_data)[row] = 1; + break; + } + } + } + previous_entry_offset = current_entry_offset; + } + + auto nested_column = ColumnArray::create( + ColumnTuple::create( + Columns{std::move(result_key_column), std::move(result_value_column)}), + std::move(result_offsets_column)); + auto result_column = ColumnMap::create(std::move(nested_column)); + if (result_type->isNullable()) + return ColumnNullable::create(std::move(result_column), std::move(result_null_map)); + return result_column; + } + const auto * entries_tuple = checkAndGetColumn(entries_data); if (!entries_tuple || entries_tuple->tupleSize() != 2) throw Exception( @@ -133,7 +197,6 @@ class SparkFunctionMapFromEntries : public IFunction key_insert_column = key_insert_holder.get(); } - const auto & result_map_type = assert_cast(*removeNullable(result_type)); auto result_key_column = result_map_type.getKeyType()->createColumn(); auto result_value_column = result_map_type.getValueType()->createColumn(); auto result_offsets_column = ColumnArray::ColumnOffsets::create(); @@ -203,7 +266,10 @@ class SparkFunctionMapFromEntries : public IFunction selected_entry.second = entry; break; } - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Duplicate map key is found in function {}", getName()); + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Duplicate map key is found in function {}", + getName()); } } @@ -223,7 +289,8 @@ class SparkFunctionMapFromEntries : public IFunction } auto nested_column = ColumnArray::create( - ColumnTuple::create(Columns{std::move(result_key_column), std::move(result_value_column)}), std::move(result_offsets_column)); + ColumnTuple::create(Columns{std::move(result_key_column), std::move(result_value_column)}), + std::move(result_offsets_column)); auto result_column = ColumnMap::create(std::move(nested_column)); if (result_type->isNullable()) return ColumnNullable::create(std::move(result_column), std::move(result_null_map)); From 5be7f8b9ccdd8a5a7cd74ddc4da385476ee1ba8e Mon Sep 17 00:00:00 2001 From: exmy Date: Thu, 25 Jun 2026 14:34:22 +0800 Subject: [PATCH 3/6] fix --- .../gluten/execution/GlutenFunctionValidateSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala index 70a7fe0563a..602d57d4a88 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala @@ -1094,8 +1094,9 @@ class GlutenFunctionValidateSuite extends GlutenClickHouseWholeStageTransformerS |) order by id |""".stripMargin runQueryAndCompare(query)(checkGlutenPlan[ProjectExecTransformer]) - runQueryAndCompare("select map_from_entries(array()) from range(1)")( - checkGlutenPlan[ProjectExecTransformer]) + runQueryAndCompare( + "select map_from_entries(cast(array() as array>)) " + + "from range(1)")(checkGlutenPlan[ProjectExecTransformer]) intercept[SparkException] { sql( From 1a0e0d6f1d63e51de895a9568ac0064cbc88a7a8 Mon Sep 17 00:00:00 2001 From: exmy Date: Fri, 26 Jun 2026 14:53:16 +0800 Subject: [PATCH 4/6] fix --- .../Functions/SparkFunctionMapFromEntries.cpp | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/cpp-ch/local-engine/Functions/SparkFunctionMapFromEntries.cpp b/cpp-ch/local-engine/Functions/SparkFunctionMapFromEntries.cpp index 5e708ad1cd0..b7e90cccbfa 100644 --- a/cpp-ch/local-engine/Functions/SparkFunctionMapFromEntries.cpp +++ b/cpp-ch/local-engine/Functions/SparkFunctionMapFromEntries.cpp @@ -146,23 +146,21 @@ class SparkFunctionMapFromEntries : public IFunction for (size_t row = 0; row < input_rows_count; ++row) { const auto current_entry_offset = entries_offsets[row]; - if (input_null_map && (*input_null_map)[row]) - { - if (result_null_map_data) - (*result_null_map_data)[row] = 1; - } - else if (entry_null_map) + const bool input_map_null = input_null_map && (*input_null_map)[row]; + bool entry_map_null = false; + if (!input_map_null && entry_null_map) { for (size_t entry = previous_entry_offset; entry < current_entry_offset; ++entry) { if ((*entry_null_map)[entry]) { - if (result_null_map_data) - (*result_null_map_data)[row] = 1; + entry_map_null = true; break; } } } + if (result_null_map_data) + (*result_null_map_data)[row] = input_map_null || entry_map_null; previous_entry_offset = current_entry_offset; } From d91bcb1a0f1de0b6f5d74fd0f0860e2c1b51ea07 Mon Sep 17 00:00:00 2001 From: exmy Date: Fri, 26 Jun 2026 15:13:01 +0800 Subject: [PATCH 5/6] Optimize map_from_entries duplicate key lookup --- .../Functions/SparkFunctionMapFromEntries.cpp | 208 +++++++++++++----- 1 file changed, 155 insertions(+), 53 deletions(-) diff --git a/cpp-ch/local-engine/Functions/SparkFunctionMapFromEntries.cpp b/cpp-ch/local-engine/Functions/SparkFunctionMapFromEntries.cpp index b7e90cccbfa..eafc1fa3b3d 100644 --- a/cpp-ch/local-engine/Functions/SparkFunctionMapFromEntries.cpp +++ b/cpp-ch/local-engine/Functions/SparkFunctionMapFromEntries.cpp @@ -29,8 +29,10 @@ #include #include #include +#include #include #include +#include #include namespace DB @@ -217,82 +219,182 @@ class SparkFunctionMapFromEntries : public IFunction if (input_null_map && (*input_null_map)[row]) { - if (result_null_map_data) - (*result_null_map_data)[row] = 1; - result_offsets.push_back(result_offset); + appendNullMap(result_null_map_data, row, result_offsets, result_offset); previous_entry_offset = current_entry_offset; continue; } - bool has_null_entry = false; - if (entry_null_map) + if (hasNullEntry(entry_null_map, previous_entry_offset, current_entry_offset)) { - for (size_t entry = previous_entry_offset; entry < current_entry_offset; ++entry) - { - if ((*entry_null_map)[entry]) - { - has_null_entry = true; - break; - } - } - } - - if (has_null_entry) - { - if (result_null_map_data) - (*result_null_map_data)[row] = 1; - result_offsets.push_back(result_offset); + appendNullMap(result_null_map_data, row, result_offsets, result_offset); previous_entry_offset = current_entry_offset; continue; } - std::vector> selected_entries; - selected_entries.reserve(current_entry_offset - previous_entry_offset); - for (size_t entry = previous_entry_offset; entry < current_entry_offset; ++entry) - { - if (key_column.isNullAt(entry)) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot use NULL as map key in function {}", getName()); + auto selected_entries = + selectEntriesForRow(key_column, previous_entry_offset, current_entry_offset); + appendSelectedEntries( + selected_entries, + *key_insert_column, + value_column, + *result_key_column, + *result_value_column, + result_offset); + + result_offsets.push_back(result_offset); + previous_entry_offset = current_entry_offset; + } + + auto nested_column = ColumnArray::create( + ColumnTuple::create(Columns{std::move(result_key_column), std::move(result_value_column)}), + std::move(result_offsets_column)); + auto result_column = ColumnMap::create(std::move(nested_column)); + if (result_type->isNullable()) + return ColumnNullable::create(std::move(result_column), std::move(result_null_map)); + return result_column; + } + +private: + struct UInt128Hash + { + size_t operator()(const UInt128 & value) const + { + return std::hash{}(value.items[0]) + ^ (std::hash{}(value.items[1]) << 1); + } + }; + + using SelectedEntry = std::pair; + using SelectedEntries = std::vector; + + static bool hasNullEntry( + const PaddedPODArray * entry_null_map, + size_t previous_entry_offset, + size_t current_entry_offset) + { + if (!entry_null_map) + return false; + + for (size_t entry = previous_entry_offset; entry < current_entry_offset; ++entry) + { + if ((*entry_null_map)[entry]) + return true; + } + return false; + } - bool has_duplicate_key = false; - for (auto & selected_entry : selected_entries) + static void appendNullMap( + PaddedPODArray * result_null_map_data, + size_t row, + ColumnArray::Offsets & result_offsets, + size_t result_offset) + { + if (result_null_map_data) + (*result_null_map_data)[row] = 1; + result_offsets.push_back(result_offset); + } + + static SelectedEntries selectEntriesForRow( + const IColumn & key_column, + size_t previous_entry_offset, + size_t current_entry_offset) + { + SelectedEntries selected_entries; + selected_entries.reserve(current_entry_offset - previous_entry_offset); + + std::unordered_map first_selected_index_by_hash; + first_selected_index_by_hash.reserve(current_entry_offset - previous_entry_offset); + std::unordered_map, UInt128Hash> + collision_selected_indices_by_hash; + + for (size_t entry = previous_entry_offset; entry < current_entry_offset; ++entry) + { + if (key_column.isNullAt(entry)) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot use NULL as map key in function {}", name); + + SipHash hash_function; + key_column.updateHashWithValue(entry, hash_function); + const UInt128 hash = hash_function.get128(); + + bool has_duplicate_key = false; + size_t duplicate_selected_index = 0; + const auto first_selected_index_it = first_selected_index_by_hash.find(hash); + if (first_selected_index_it != first_selected_index_by_hash.end()) + { + const auto first_selected_index = first_selected_index_it->second; + if (key_column.compareAt( + entry, + selected_entries[first_selected_index].first, + key_column, + 1) == 0) + { + has_duplicate_key = true; + duplicate_selected_index = first_selected_index; + } + else { - if (key_column.compareAt(entry, selected_entry.first, key_column, 1) == 0) + const auto collision_selected_indices_it = + collision_selected_indices_by_hash.find(hash); + if (collision_selected_indices_it != collision_selected_indices_by_hash.end()) { - has_duplicate_key = true; - if constexpr (last_win) + for (const auto selected_index : collision_selected_indices_it->second) { - selected_entry.second = entry; - break; + if (key_column.compareAt( + entry, + selected_entries[selected_index].first, + key_column, + 1) == 0) + { + has_duplicate_key = true; + duplicate_selected_index = selected_index; + break; + } } - throw Exception( - ErrorCodes::BAD_ARGUMENTS, - "Duplicate map key is found in function {}", - getName()); } } - - if (!has_duplicate_key) - selected_entries.emplace_back(entry, entry); } - for (const auto selected_entry : selected_entries) + if (has_duplicate_key) { - result_key_column->insertFrom(*key_insert_column, selected_entry.first); - result_value_column->insertFrom(value_column, selected_entry.second); - ++result_offset; + if constexpr (last_win) + { + selected_entries[duplicate_selected_index].second = entry; + continue; + } + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Duplicate map key is found in function {}", + name); } - result_offsets.push_back(result_offset); - previous_entry_offset = current_entry_offset; + if (first_selected_index_it == first_selected_index_by_hash.end()) + { + first_selected_index_by_hash.emplace(hash, selected_entries.size()); + } + else + { + collision_selected_indices_by_hash[hash].push_back(selected_entries.size()); + } + selected_entries.emplace_back(entry, entry); } - auto nested_column = ColumnArray::create( - ColumnTuple::create(Columns{std::move(result_key_column), std::move(result_value_column)}), - std::move(result_offsets_column)); - auto result_column = ColumnMap::create(std::move(nested_column)); - if (result_type->isNullable()) - return ColumnNullable::create(std::move(result_column), std::move(result_null_map)); - return result_column; + return selected_entries; + } + + static void appendSelectedEntries( + const SelectedEntries & selected_entries, + const IColumn & key_insert_column, + const IColumn & value_column, + IColumn & result_key_column, + IColumn & result_value_column, + size_t & result_offset) + { + for (const auto & selected_entry : selected_entries) + { + result_key_column.insertFrom(key_insert_column, selected_entry.first); + result_value_column.insertFrom(value_column, selected_entry.second); + ++result_offset; + } } }; From daef642d3049f0b61434646a0f902efd6da4e83d Mon Sep 17 00:00:00 2001 From: exmy Date: Fri, 26 Jun 2026 15:22:39 +0800 Subject: [PATCH 6/6] Move map_from_entries LAST_WIN rewrite to CH backend --- .../backendsapi/clickhouse/CHSparkPlanExecApi.scala | 13 +++++++++++-- .../gluten/expression/ExpressionConverter.scala | 7 +------ 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index f36722e9cfd..36c9813fbb8 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -56,6 +56,7 @@ import org.apache.spark.sql.execution.joins.{BuildSideRelation, ClickHouseBuildS import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.execution.utils.{CHExecUtil, PushDownUtil} import org.apache.spark.sql.execution.window._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.SparkVersionUtil @@ -407,8 +408,16 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging { override def genMapFromEntriesTransformer( substraitExprName: String, child: ExpressionTransformer, - expr: Expression): ExpressionTransformer = - GenericExpressionTransformer(substraitExprName, Seq(child), expr) + expr: Expression): ExpressionTransformer = { + val mapKeyDedupPolicy = SQLConf.get.getConf(SQLConf.MAP_KEY_DEDUP_POLICY) + val chExprName = + if (mapKeyDedupPolicy.toString == SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { + ExpressionNames.MAP_FROM_ENTRIES_LAST_WIN + } else { + substraitExprName + } + GenericExpressionTransformer(chExprName, Seq(child), expr) + } /** * Generate ShuffleDependency for ColumnarShuffleExchangeExec. diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala index 82c19021afa..4f322a66d88 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala @@ -336,13 +336,8 @@ object ExpressionConverter extends SQLConfHelper with Logging { replaceWithExpressionTransformer0(m.child, attributeSeq, expressionsMap), m) case m: MapFromEntries => - val mapKeyDedupPolicy = SQLConf.get.getConf(SQLConf.MAP_KEY_DEDUP_POLICY) BackendsApiManager.getSparkPlanExecApiInstance.genMapFromEntriesTransformer( - if (mapKeyDedupPolicy.toString == SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { - ExpressionNames.MAP_FROM_ENTRIES_LAST_WIN - } else { - substraitExprName - }, + substraitExprName, replaceWithExpressionTransformer0(m.child, attributeSeq, expressionsMap), m )