From 55590bb1aedfd19ceaf02a4913bc4994298b1df1 Mon Sep 17 00:00:00 2001 From: 924060929 Date: Mon, 15 Jun 2026 16:04:57 +0800 Subject: [PATCH] [fix](retention) limit param count to 32 to avoid BE heap overflow retention() keeps its aggregate state in a fixed-size RetentionState::events[32] array and serializes it into a single int64 bitmap, so it supports at most 32 events. Neither FE nor BE enforced this limit: - AggregateFunctionRetention::add() iterates over all argument columns and calls RetentionState::set(i) (events[i] = 1) with no bound check, so more than 32 params write past events[] (heap out-of-bounds write). - insert_result_into() reads events[] for the actual argument count, reading past the array as well. With retention() called with 102 boolean args this corrupted the heap and later crashed BE in the streaming aggregation output path (ColumnString length corruption "string column length is too large: total_length=4295003729" -> memcpy SIGSEGV in StreamingAggLocalState::_get_results_with_serialized_key). Enforce the 32-event limit on both sides: - FE Retention.checkLegalityBeforeTypeCoercion rejects > 32 params with a clear AnalysisException at planning time. - BE AggregateFunctionRetention ctor throws INVALID_ARGUMENT when given more than RetentionState::MAX_EVENTS argument types, as a backstop for any path reaching BE. --- .../aggregate/aggregate_function_retention.h | 13 +++++- .../exprs/aggregate/vec_retention_test.cpp | 17 +++++++ .../expressions/functions/agg/Retention.java | 11 +++++ ...est_aggregate_retention_param_limit.groovy | 44 +++++++++++++++++++ 4 files changed, 84 insertions(+), 1 deletion(-) create mode 100644 regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_retention_param_limit.groovy diff --git a/be/src/exprs/aggregate/aggregate_function_retention.h b/be/src/exprs/aggregate/aggregate_function_retention.h index 189a31b441a80a..6132179eaf6962 100644 --- a/be/src/exprs/aggregate/aggregate_function_retention.h +++ b/be/src/exprs/aggregate/aggregate_function_retention.h @@ -27,6 +27,8 @@ #include #include +#include "common/exception.h" +#include "common/status.h" #include "core/assert_cast.h" #include "core/column/column.h" #include "core/column/column_array.h" @@ -112,7 +114,16 @@ class AggregateFunctionRetention final public: AggregateFunctionRetention(const DataTypes& argument_types_) : IAggregateFunctionDataHelper( - argument_types_) {} + argument_types_) { + // RetentionState only has room for MAX_EVENTS(32) events (fixed-size events[] array, + // plus an int64 serialized bitmap). More params would overflow events[] in add()/ + // insert_result_into() and corrupt the heap, so reject it at construction time. + if (argument_types_.size() > RetentionState::MAX_EVENTS) { + throw Exception(ErrorCode::INVALID_ARGUMENT, + "retention function can accept at most {} params, but got {}", + RetentionState::MAX_EVENTS, argument_types_.size()); + } + } String get_name() const override { return "retention"; } diff --git a/be/test/exprs/aggregate/vec_retention_test.cpp b/be/test/exprs/aggregate/vec_retention_test.cpp index 21966fb7f986f8..ae3cb320c0f36e 100644 --- a/be/test/exprs/aggregate/vec_retention_test.cpp +++ b/be/test/exprs/aggregate/vec_retention_test.cpp @@ -22,6 +22,7 @@ #include #include +#include "common/exception.h" #include "common/logging.h" #include "core/assert_cast.h" #include "core/column/column_array.h" @@ -33,6 +34,7 @@ #include "core/string_buffer.hpp" #include "core/types.h" #include "exprs/aggregate/aggregate_function.h" +#include "exprs/aggregate/aggregate_function_retention.h" #include "exprs/aggregate/aggregate_function_simple_factory.h" #include "gtest/gtest_pred_impl.h" @@ -293,4 +295,19 @@ TEST_F(VRetentionTest, testSerialize) { agg_function->destroy(place2); agg_function->destroy(place3); } + +TEST_F(VRetentionTest, testMaxEventsBoundary) { + AggregateFunctionSimpleFactory factory = AggregateFunctionSimpleFactory::instance(); + + // 32 boolean params is the maximum allowed (RetentionState::MAX_EVENTS) and must succeed. + DataTypes max_types(RetentionState::MAX_EVENTS, std::make_shared()); + auto fn = factory.get("retention", max_types, nullptr, false, -1); + EXPECT_NE(fn, nullptr); + + // 33 boolean params overflow the fixed-size events[32] array; the function must be rejected + // at construction time instead of corrupting the heap. + DataTypes too_many_types(RetentionState::MAX_EVENTS + 1, std::make_shared()); + EXPECT_THROW({ factory.get("retention", too_many_types, nullptr, false, -1); }, + doris::Exception); +} } // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Retention.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Retention.java index 24c2801e0aa9c2..87177f34b3856b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Retention.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Retention.java @@ -37,6 +37,13 @@ public class Retention extends NullableAggregateFunction implements ExplicitlyCastableSignature { + // The BE side stores the retention state in a fixed-size array + // (RetentionState::MAX_EVENTS, uint8_t events[32]) and also serializes it into a single + // int64 bitmap, so at most 32 conditions can be represented. Passing more than 32 params + // overflows that array on BE and causes a heap out-of-bounds write/read (BE core). + // Keep this in sync with be/src/exprs/aggregate/aggregate_function_retention.h. + public static final int MAX_EVENTS = 32; + public static final List SIGNATURES = ImmutableList.of( FunctionSignature.ret(ArrayType.of(BooleanType.INSTANCE)).varArgs(BooleanType.INSTANCE) ); @@ -70,6 +77,10 @@ public void checkLegalityBeforeTypeCoercion() { if (this.children.isEmpty()) { throw new AnalysisException("The " + functionName + " function must have at least one param"); } + if (children.size() > MAX_EVENTS) { + throw new AnalysisException("The " + functionName + " function can accept at most " + MAX_EVENTS + + " params, but got " + children.size()); + } for (int i = 0; i < children.size(); i++) { if (!getArgumentType(i).isBooleanType()) { diff --git a/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_retention_param_limit.groovy b/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_retention_param_limit.groovy new file mode 100644 index 00000000000000..3fb3fbbc0e9316 --- /dev/null +++ b/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_retention_param_limit.groovy @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// retention() stores its state in a fixed-size events[32] array on BE. Passing more than +// 32 conditions used to overflow that array and core the BE. FE must reject +// > 32 params with a clear error instead of sending the query to BE. +suite("test_aggregate_retention_param_limit") { + sql "DROP TABLE IF EXISTS retention_param_limit_test" + sql """ + CREATE TABLE retention_param_limit_test ( + uid INT, + dt DATETIME + ) + DUPLICATE KEY(uid) + DISTRIBUTED BY HASH(uid) BUCKETS 1 + PROPERTIES ("replication_num" = "1") + """ + sql """ INSERT INTO retention_param_limit_test VALUES (1, '2026-01-01'), (1, '2026-01-02'), (2, '2026-01-01') """ + + def conds = { int n -> (1..n).collect { "uid = ${it}" }.join(", ") } + + // 32 conditions is the maximum allowed and must succeed. + sql """ SELECT uid, retention(${conds(32)}) FROM retention_param_limit_test GROUP BY uid ORDER BY uid """ + + // 33 conditions must be rejected by FE with a clear error (not a BE crash). + test { + sql """ SELECT uid, retention(${conds(33)}) FROM retention_param_limit_test GROUP BY uid """ + exception "at most 32" + } +}