diff --git a/be/src/exprs/aggregate/aggregate_function_retention.h b/be/src/exprs/aggregate/aggregate_function_retention.h index 189a31b441a80a..6132179eaf6962 100644 --- a/be/src/exprs/aggregate/aggregate_function_retention.h +++ b/be/src/exprs/aggregate/aggregate_function_retention.h @@ -27,6 +27,8 @@ #include #include +#include "common/exception.h" +#include "common/status.h" #include "core/assert_cast.h" #include "core/column/column.h" #include "core/column/column_array.h" @@ -112,7 +114,16 @@ class AggregateFunctionRetention final public: AggregateFunctionRetention(const DataTypes& argument_types_) : IAggregateFunctionDataHelper( - argument_types_) {} + argument_types_) { + // RetentionState only has room for MAX_EVENTS(32) events (fixed-size events[] array, + // plus an int64 serialized bitmap). More params would overflow events[] in add()/ + // insert_result_into() and corrupt the heap, so reject it at construction time. + if (argument_types_.size() > RetentionState::MAX_EVENTS) { + throw Exception(ErrorCode::INVALID_ARGUMENT, + "retention function can accept at most {} params, but got {}", + RetentionState::MAX_EVENTS, argument_types_.size()); + } + } String get_name() const override { return "retention"; } diff --git a/be/test/exprs/aggregate/vec_retention_test.cpp b/be/test/exprs/aggregate/vec_retention_test.cpp index 21966fb7f986f8..ae3cb320c0f36e 100644 --- a/be/test/exprs/aggregate/vec_retention_test.cpp +++ b/be/test/exprs/aggregate/vec_retention_test.cpp @@ -22,6 +22,7 @@ #include #include +#include "common/exception.h" #include "common/logging.h" #include "core/assert_cast.h" #include "core/column/column_array.h" @@ -33,6 +34,7 @@ #include "core/string_buffer.hpp" #include "core/types.h" #include "exprs/aggregate/aggregate_function.h" +#include "exprs/aggregate/aggregate_function_retention.h" #include "exprs/aggregate/aggregate_function_simple_factory.h" #include "gtest/gtest_pred_impl.h" @@ -293,4 +295,19 @@ TEST_F(VRetentionTest, testSerialize) { agg_function->destroy(place2); agg_function->destroy(place3); } + +TEST_F(VRetentionTest, testMaxEventsBoundary) { + AggregateFunctionSimpleFactory factory = AggregateFunctionSimpleFactory::instance(); + + // 32 boolean params is the maximum allowed (RetentionState::MAX_EVENTS) and must succeed. + DataTypes max_types(RetentionState::MAX_EVENTS, std::make_shared()); + auto fn = factory.get("retention", max_types, nullptr, false, -1); + EXPECT_NE(fn, nullptr); + + // 33 boolean params overflow the fixed-size events[32] array; the function must be rejected + // at construction time instead of corrupting the heap. + DataTypes too_many_types(RetentionState::MAX_EVENTS + 1, std::make_shared()); + EXPECT_THROW({ factory.get("retention", too_many_types, nullptr, false, -1); }, + doris::Exception); +} } // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Retention.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Retention.java index 24c2801e0aa9c2..87177f34b3856b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Retention.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Retention.java @@ -37,6 +37,13 @@ public class Retention extends NullableAggregateFunction implements ExplicitlyCastableSignature { + // The BE side stores the retention state in a fixed-size array + // (RetentionState::MAX_EVENTS, uint8_t events[32]) and also serializes it into a single + // int64 bitmap, so at most 32 conditions can be represented. Passing more than 32 params + // overflows that array on BE and causes a heap out-of-bounds write/read (BE core). + // Keep this in sync with be/src/exprs/aggregate/aggregate_function_retention.h. + public static final int MAX_EVENTS = 32; + public static final List SIGNATURES = ImmutableList.of( FunctionSignature.ret(ArrayType.of(BooleanType.INSTANCE)).varArgs(BooleanType.INSTANCE) ); @@ -70,6 +77,10 @@ public void checkLegalityBeforeTypeCoercion() { if (this.children.isEmpty()) { throw new AnalysisException("The " + functionName + " function must have at least one param"); } + if (children.size() > MAX_EVENTS) { + throw new AnalysisException("The " + functionName + " function can accept at most " + MAX_EVENTS + + " params, but got " + children.size()); + } for (int i = 0; i < children.size(); i++) { if (!getArgumentType(i).isBooleanType()) { diff --git a/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_retention_param_limit.groovy b/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_retention_param_limit.groovy new file mode 100644 index 00000000000000..3fb3fbbc0e9316 --- /dev/null +++ b/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_retention_param_limit.groovy @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// retention() stores its state in a fixed-size events[32] array on BE. Passing more than +// 32 conditions used to overflow that array and core the BE. FE must reject +// > 32 params with a clear error instead of sending the query to BE. +suite("test_aggregate_retention_param_limit") { + sql "DROP TABLE IF EXISTS retention_param_limit_test" + sql """ + CREATE TABLE retention_param_limit_test ( + uid INT, + dt DATETIME + ) + DUPLICATE KEY(uid) + DISTRIBUTED BY HASH(uid) BUCKETS 1 + PROPERTIES ("replication_num" = "1") + """ + sql """ INSERT INTO retention_param_limit_test VALUES (1, '2026-01-01'), (1, '2026-01-02'), (2, '2026-01-01') """ + + def conds = { int n -> (1..n).collect { "uid = ${it}" }.join(", ") } + + // 32 conditions is the maximum allowed and must succeed. + sql """ SELECT uid, retention(${conds(32)}) FROM retention_param_limit_test GROUP BY uid ORDER BY uid """ + + // 33 conditions must be rejected by FE with a clear error (not a BE crash). + test { + sql """ SELECT uid, retention(${conds(33)}) FROM retention_param_limit_test GROUP BY uid """ + exception "at most 32" + } +}