Skip to content

Commit d71571c

Browse files
committed
fix: ResultSet SSO corruption + materializing-op caps + merge-agg schema check
Second batch of fixes from the codebase review. 1059/1096 tests pass (0 failures, 37 skipped require Docker), 14 new tests added. 1. ResultSet.own_string: SSO use-after-free in owned_strings vector This was the actual root cause of the garbled output the user observed in the bare-metal benchmarks (SELECT * FROM users showing pointer-like IDs and corrupted text columns). The diagnosis the agent gave -- about line 711 in mysql_server.cpp -- was wrong; the real bug lives here. owned_strings was std::vector<std::string>. own_string() emplace_back'd a std::string and returned a StringRef pointing at the new entry's c_str(). For short strings using SSO, the character data lives INSIDE the std::string object itself, so when the vector reallocated and move-constructed each std::string at a new heap address, every previously-handed-out StringRef became a dangling pointer to freed memory. Long strings happened to survive (their character buffer stays at the same heap address through the move) but short strings like "Alice", "Eng", "Sales" were silently corrupted. Fix: switch to std::deque<std::string>. push_back on a deque does not move existing elements, so element addresses (and thus c_str() pointers) remain valid for the lifetime of the deque. Verified: I temporarily reverted to std::vector<std::string> with the new tests in place; the regression tests fail with corrupted data, exactly matching the production symptom. Restored the deque fix and the tests pass cleanly. 2. MergeAggregateOperator: validate per-row schema matches what DistributedPlanner promised The merge expected exactly [group_key_count] + [merge_op_count] columns per partial-aggregate row. A schema drift between what the planner built and what the shard returned would silently truncate or misalign the merge, producing wrong aggregate results. Now we throw a clear runtime_error on the first row that doesn't match. 3. Hard row caps on materializing operators (engine_limits.h) Sort, Aggregate, HashJoin, NestedLoopJoin, Distinct, SetOp, Window now cap their materialized state at kDefaultMaxOperatorRows (10 million, overridable at compile time via SQL_ENGINE_MAX_OPERATOR_ROWS). This converts a "process killed by OOM" failure into a clean std::runtime_error with the offending operator name. Not a substitute for spill-to-disk, but it stops a runaway query from taking down the engine. Verified-not-a-bug: - Correlated subqueries: the agent claimed "scaffolding without wiring", but PlanExecutor::setup_subquery_executor DOES wire the correlated callback, expression_eval DOES pass the outer resolver through, and filter_op/project_op DO consult outer_resolver_ on cache miss. Existing CorrelatedSubqueryExists / Scalar / Avg tests already pass. Added 2 new tests covering correlated NOT EXISTS and correlated IN with arithmetic on outer columns -- both pass. New tests: - tests/test_result_set.cpp: 4 tests pinning down the deque invariant for short strings, long strings, mixed sizes, and Value::str_val round-trip after many subsequent push_backs. - tests/test_operators.cpp: 3 MergeAggregate schema-validation tests (matching schema, too few cols, too many cols) and 3 engine_limits helper tests (throws at limit, allows below, error message). - tests/test_subquery.cpp: 2 new correlated subquery tests (NOT EXISTS, IN with outer column reference + arithmetic).
1 parent 9982885 commit d71571c

14 files changed

Lines changed: 437 additions & 5 deletions

Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ TEST_SRCS = $(TEST_DIR)/test_main.cpp \
8080
$(TEST_DIR)/test_distributed_txn.cpp \
8181
$(TEST_DIR)/test_window.cpp \
8282
$(TEST_DIR)/test_cte.cpp \
83-
$(TEST_DIR)/test_datetime_format.cpp
83+
$(TEST_DIR)/test_datetime_format.cpp \
84+
$(TEST_DIR)/test_result_set.cpp
8485
TEST_OBJS = $(TEST_SRCS:.cpp=.o)
8586
TEST_TARGET = $(PROJECT_ROOT)/run_tests
8687

include/sql_engine/engine_limits.h

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
#ifndef SQL_ENGINE_ENGINE_LIMITS_H
2+
#define SQL_ENGINE_ENGINE_LIMITS_H
3+
4+
// engine_limits.h -- safety caps for materializing operators.
5+
//
6+
// Several operators (Sort, Aggregate, HashJoin, NestedLoopJoin, Distinct,
7+
// SetOp, Window) materialize their input or build state into in-memory
8+
// collections. Without a cap, a runaway query can exhaust available memory
9+
// and bring down the whole process via SIGKILL.
10+
//
11+
// We don't have spill-to-disk yet. As a defensive measure, every
12+
// materializing operator enforces a hard row-count cap and throws a clear
13+
// std::runtime_error when it's exceeded. This converts a "process killed,
14+
// no message" failure into "query exceeds engine memory limit".
15+
//
16+
// The cap is intentionally generous (10 million rows) so existing
17+
// well-behaved queries are unaffected, but it's small enough that a single
18+
// query can't allocate tens of gigabytes of row state. For operators with
19+
// unusual storage shapes (HashJoin's hash table, Aggregate's group map)
20+
// the same cap applies to the number of distinct entries, which is what
21+
// actually drives memory usage.
22+
//
23+
// The cap can be raised globally by defining
24+
// SQL_ENGINE_MAX_OPERATOR_ROWS at compile time, and individual call sites
25+
// can pass a per-operator override via a setter (not currently exposed).
26+
27+
#include <cstddef>
28+
#include <stdexcept>
29+
#include <string>
30+
31+
namespace sql_engine {
32+
33+
#ifdef SQL_ENGINE_MAX_OPERATOR_ROWS
34+
inline constexpr std::size_t kDefaultMaxOperatorRows = SQL_ENGINE_MAX_OPERATOR_ROWS;
35+
#else
36+
inline constexpr std::size_t kDefaultMaxOperatorRows = 10'000'000;
37+
#endif
38+
39+
// Throws std::runtime_error with a clear message when an operator's
40+
// materialized row/state count is about to exceed the cap. The op_name
41+
// argument is the operator class name (used in the error message) so users
42+
// can tell which operator hit the limit.
43+
inline void check_operator_row_limit(std::size_t current_count,
44+
std::size_t limit,
45+
const char* op_name) {
46+
if (current_count >= limit) {
47+
throw std::runtime_error(
48+
std::string(op_name) +
49+
": exceeded engine row limit (" + std::to_string(limit) +
50+
" rows / state entries). The engine does not yet spill large "
51+
"intermediate results to disk; raise the limit by defining "
52+
"SQL_ENGINE_MAX_OPERATOR_ROWS at compile time, or rewrite the "
53+
"query to bound the intermediate state.");
54+
}
55+
}
56+
57+
} // namespace sql_engine
58+
59+
#endif // SQL_ENGINE_ENGINE_LIMITS_H

include/sql_engine/operators/aggregate_op.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "sql_engine/operator.h"
55
#include "sql_engine/expression_eval.h"
66
#include "sql_engine/catalog.h"
7+
#include "sql_engine/engine_limits.h"
78
#include "sql_parser/arena.h"
89
#include <functional>
910
#include <vector>
@@ -44,6 +45,8 @@ class AggregateOperator : public Operator {
4445

4546
auto it = groups_.find(key);
4647
if (it == groups_.end()) {
48+
// Cap distinct group count to prevent unbounded memory.
49+
check_operator_row_limit(groups_.size(), kDefaultMaxOperatorRows, "AggregateOperator");
4750
GroupState state;
4851
state.group_values.reserve(group_count_);
4952
for (uint16_t i = 0; i < group_count_; ++i) {

include/sql_engine/operators/distinct_op.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#define SQL_ENGINE_OPERATORS_DISTINCT_OP_H
33

44
#include "sql_engine/operator.h"
5+
#include "sql_engine/engine_limits.h"
56
#include <unordered_set>
67
#include <string>
78

@@ -20,6 +21,10 @@ class DistinctOperator : public Operator {
2021
bool next(Row& out) override {
2122
while (child_->next(out)) {
2223
std::string key = row_key(out);
24+
// Cap distinct row count to prevent unbounded seen_ set.
25+
if (seen_.find(key) == seen_.end()) {
26+
check_operator_row_limit(seen_.size(), kDefaultMaxOperatorRows, "DistinctOperator");
27+
}
2328
if (seen_.insert(key).second) {
2429
return true;
2530
}

include/sql_engine/operators/hash_join_op.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "sql_engine/expression_eval.h"
66
#include "sql_engine/catalog.h"
77
#include "sql_engine/plan_node.h"
8+
#include "sql_engine/engine_limits.h"
89
#include "sql_parser/arena.h"
910
#include <functional>
1011
#include <vector>
@@ -50,9 +51,13 @@ class HashJoinOperator : public Operator {
5051
hash_table_.clear();
5152
right_->open();
5253
Row r{};
54+
std::size_t total_build_rows = 0;
5355
while (right_->next(r)) {
56+
// Cap total build-side rows to prevent unbounded hash table.
57+
check_operator_row_limit(total_build_rows, kDefaultMaxOperatorRows, "HashJoinOperator");
5458
uint64_t h = hash_value(r.get(right_join_col_));
5559
hash_table_[h].push_back(copy_row(r));
60+
++total_build_rows;
5661
}
5762
right_->close();
5863

include/sql_engine/operators/join_op.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "sql_engine/expression_eval.h"
66
#include "sql_engine/catalog.h"
77
#include "sql_engine/plan_node.h"
8+
#include "sql_engine/engine_limits.h"
89
#include "sql_parser/arena.h"
910
#include <stdexcept>
1011
#include <functional>
@@ -46,6 +47,8 @@ class NestedLoopJoinOperator : public Operator {
4647
right_rows_.clear();
4748
Row r{};
4849
while (right_->next(r)) {
50+
// Cap right side to prevent O(n*m) explosion + OOM.
51+
check_operator_row_limit(right_rows_.size(), kDefaultMaxOperatorRows, "NestedLoopJoinOperator");
4952
right_rows_.push_back(r);
5053
}
5154
right_->close();

include/sql_engine/operators/merge_aggregate_op.h

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
#include <string>
1212
#include <cstring>
1313
#include <future>
14-
#include <mutex>
14+
#include <stdexcept>
1515

1616
namespace sql_engine {
1717

@@ -169,6 +169,15 @@ class MergeAggregateOperator : public Operator {
169169
size_t result_idx_ = 0;
170170

171171
void merge_into_groups(const Row& row) {
172+
// Validate that the row's schema matches what DistributedPlanner
173+
// promised: exactly [group_key_count_ group keys] +
174+
// [merge_op_count_ partial aggregate columns]. Previously, mismatched
175+
// schemas were silently truncated by `if (col_idx >= row.column_count)
176+
// continue;` in merge_row, producing silently-wrong aggregates. Throw
177+
// a clear error instead so a remote schema drift is caught at the
178+
// first row.
179+
check_row_schema(row);
180+
172181
std::string key = compute_group_key(row);
173182
auto it = groups_.find(key);
174183
if (it == groups_.end()) {
@@ -189,6 +198,26 @@ class MergeAggregateOperator : public Operator {
189198
merge_row(it->second, row);
190199
}
191200

201+
// Expected schema is exactly group_key_count_ + merge_op_count_ columns,
202+
// in that order. We compute it once into expected_col_count_ on the first
203+
// row to avoid recomputing per row. A mismatch on any row aborts the
204+
// merge with a clear error message instead of silently corrupting state.
205+
void check_row_schema(const Row& row) {
206+
const uint16_t expected =
207+
static_cast<uint16_t>(group_key_count_ + merge_op_count_);
208+
if (row.column_count != expected) {
209+
throw std::runtime_error(
210+
"distributed aggregate merge: shard returned " +
211+
std::to_string(row.column_count) +
212+
" columns, expected " + std::to_string(expected) +
213+
" (" + std::to_string(group_key_count_) +
214+
" group key(s) + " + std::to_string(merge_op_count_) +
215+
" partial aggregate(s)). Remote shard schema does not match "
216+
"what DistributedPlanner built; aborting to avoid silently "
217+
"corrupted aggregate results.");
218+
}
219+
}
220+
192221
std::string compute_group_key(const Row& row) {
193222
std::string key;
194223
for (uint16_t i = 0; i < group_key_count_; ++i) {
@@ -214,9 +243,11 @@ class MergeAggregateOperator : public Operator {
214243
}
215244

216245
void merge_row(GroupState& state, const Row& row) {
246+
// Schema is guaranteed by check_row_schema() above, so we can skip
247+
// bounds checks here. Defensive: still compute col_idx with the
248+
// expected layout.
217249
for (uint16_t i = 0; i < merge_op_count_; ++i) {
218250
uint16_t col_idx = group_key_count_ + i;
219-
if (col_idx >= row.column_count) continue;
220251
Value v = row.get(col_idx);
221252

222253
MergeOp op = static_cast<MergeOp>(merge_ops_[i]);

include/sql_engine/operators/set_op_op.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "sql_engine/operator.h"
55
#include "sql_engine/plan_node.h"
66
#include "sql_engine/thread_pool.h"
7+
#include "sql_engine/engine_limits.h"
78
#include <unordered_set>
89
#include <string>
910
#include <vector>
@@ -46,6 +47,7 @@ class SetOpOperator : public Operator {
4647
Row r{};
4748
while (right_->next(r)) {
4849
check_col_count(r);
50+
check_operator_row_limit(right_set_.size(), kDefaultMaxOperatorRows, "SetOpOperator");
4951
right_set_.insert(row_key(r));
5052
}
5153
right_->close();
@@ -68,6 +70,9 @@ class SetOpOperator : public Operator {
6870
if (got) {
6971
check_col_count(out);
7072
std::string key = row_key(out);
73+
if (seen_.find(key) == seen_.end()) {
74+
check_operator_row_limit(seen_.size(), kDefaultMaxOperatorRows, "SetOpOperator");
75+
}
7176
if (seen_.insert(key).second) return true;
7277
}
7378
}

include/sql_engine/operators/sort_op.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "sql_engine/operator.h"
55
#include "sql_engine/expression_eval.h"
66
#include "sql_engine/catalog.h"
7+
#include "sql_engine/engine_limits.h"
78
#include "sql_parser/arena.h"
89
#include <algorithm>
910
#include <functional>
@@ -33,6 +34,8 @@ class SortOperator : public Operator {
3334

3435
Row row{};
3536
while (child_->next(row)) {
37+
// Hard cap to prevent unbounded materialization (no spill yet).
38+
check_operator_row_limit(rows_.size(), kDefaultMaxOperatorRows, "SortOperator");
3639
rows_.push_back(row);
3740
}
3841
child_->close();

include/sql_engine/operators/window_op.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "sql_engine/operator.h"
55
#include "sql_engine/expression_eval.h"
66
#include "sql_engine/catalog.h"
7+
#include "sql_engine/engine_limits.h"
78
#include "sql_parser/arena.h"
89
#include <functional>
910
#include <vector>
@@ -44,6 +45,7 @@ class WindowOperator : public Operator {
4445
// 1. Consume all child rows into buffer
4546
Row row{};
4647
while (child_->next(row)) {
48+
check_operator_row_limit(buffer_.size(), kDefaultMaxOperatorRows, "WindowOperator");
4749
buffer_.push_back(row);
4850
}
4951
child_->close();

0 commit comments

Comments
 (0)