Skip to content

Commit 9982885

Browse files
committed
fix: silent correctness bugs and resource safety from codebase review
Six fixes flagged in the recent stubs/incomplete-features review. None change behavior for queries that already worked; all tests still pass (1047/1084, 37 skipped require Docker, 26 new tests, 0 failures). 1. mysql_server: format DATE/TIME/DATETIME/TIMESTAMP/INTERVAL values value_to_string had no cases for the temporal Value tags, so date and time columns rendered as empty strings over the wire protocol. Add format_date / format_time / format_datetime helpers in datetime_parse (using Howard Hinnant's civil_from_days algorithm) and wire them in. Also map TAG_DATE/TIME/TIMESTAMP to the proper MySQL field type codes in column metadata instead of falling back to VAR_STRING. 2. SetOpOperator: validate column counts across UNION/INTERSECT/EXCEPT Mismatched column counts on the two sides of a set operation used to silently truncate via row_key(), producing wrong results. Track the first row's column_count and throw runtime_error on any mismatch. 3. NestedLoopJoinOperator / HashJoinOperator: reject unsupported join types RIGHT and FULL joins (and SEMI/ANTI in HashJoin) silently fell through to the INNER code path, returning wrong rows. Throw runtime_error in open() with a clear "not yet supported" message until they're properly implemented. 4. ThreadSafeMultiRemoteExecutor: RAII for connection + result set Replace manual checkout/checkin with ConnectionGuard (returns on dtor, closes-and-discards if poisoned) and ResultGuard (frees MYSQL_RES on dtor). Connections that hit a query error or exception are now poisoned instead of being returned to the pool in a possibly-broken state, and MYSQL_RES can no longer leak on a throwing result conversion. 5. Session: bounded LRU plan cache Plan cache used to be an unordered_map with no eviction, growing forever for high-QPS workloads with varied SQL. Replace with an intrusive list + map LRU bounded at 1024 entries by default. Cache size is configurable via set_plan_cache_max_size(0 disables caching). 6. ThreadPool: document the exception-safety invariant The submit() path wraps callables in std::packaged_task, which catches exceptions and stores them in the future for later get(). The agent review claimed worker threads could die silently on a thrown task -- that's not true today, but it would become true if a future change queued raw callables. Add a comment locking in the invariant so nobody breaks it accidentally. New tests: - tests/test_datetime_format.cpp: 19 tests covering parse/format round-trips for date, time, datetime including dates before the epoch and a 200-year sweep validating days_to_ymd against days_since_epoch. - tests/test_operators.cpp: 4 new tests for SetOp column-count rejection (UNION ALL and INTERSECT) and JoinOp RIGHT/FULL rejection. - tests/test_session.cpp: 3 new tests for plan cache size, LRU eviction, and disabled-cache mode. Note: a separate review claim that mysql_server.cpp:711 contained a "sequence number double-increment bug" was investigated and is incorrect. The MySQL protocol requires server responses to use client_seq + 1, and the increment at line 711 correctly advances from the client's seq to the server's first response seq. No change made there.
1 parent f820e73 commit 9982885

13 files changed

Lines changed: 681 additions & 29 deletions

File tree

Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ TEST_SRCS = $(TEST_DIR)/test_main.cpp \
7979
$(TEST_DIR)/test_single_backend_txn.cpp \
8080
$(TEST_DIR)/test_distributed_txn.cpp \
8181
$(TEST_DIR)/test_window.cpp \
82-
$(TEST_DIR)/test_cte.cpp
82+
$(TEST_DIR)/test_cte.cpp \
83+
$(TEST_DIR)/test_datetime_format.cpp
8384
TEST_OBJS = $(TEST_SRCS:.cpp=.o)
8485
TEST_TARGET = $(PROJECT_ROOT)/run_tests
8586

include/sql_engine/datetime_parse.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#define SQL_ENGINE_DATETIME_PARSE_H
33

44
#include <cstdint>
5+
#include <cstddef>
56

67
namespace sql_engine {
78
namespace datetime_parse {
@@ -19,6 +20,22 @@ int64_t parse_time(const char* s);
1920
// Handles leap years correctly.
2021
int32_t days_since_epoch(int year, int month, int day);
2122

23+
// Calendar math: decompose days since 1970-01-01 into year/month/day.
24+
// Handles leap years and dates before the epoch correctly.
25+
void days_to_ymd(int32_t days, int& year, int& month, int& day);
26+
27+
// Format days since epoch as "YYYY-MM-DD" into buf (needs >= 11 bytes including NUL).
28+
// Returns number of bytes written (excluding NUL).
29+
size_t format_date(int32_t days, char* buf, size_t buf_len);
30+
31+
// Format microseconds since midnight as "HH:MM:SS" or "HH:MM:SS.uuuuuu".
32+
// Returns number of bytes written (excluding NUL).
33+
size_t format_time(int64_t us_since_midnight, char* buf, size_t buf_len);
34+
35+
// Format microseconds since epoch as "YYYY-MM-DD HH:MM:SS[.uuuuuu]".
36+
// Returns number of bytes written (excluding NUL).
37+
size_t format_datetime(int64_t us_since_epoch, char* buf, size_t buf_len);
38+
2239
} // namespace datetime_parse
2340
} // namespace sql_engine
2441

include/sql_engine/operators/hash_join_op.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <functional>
1010
#include <vector>
1111
#include <unordered_map>
12+
#include <stdexcept>
1213

1314
namespace sql_engine {
1415

@@ -33,6 +34,18 @@ class HashJoinOperator : public Operator {
3334
arena_(arena) {}
3435

3536
void open() override {
37+
// Only INNER and LEFT equi-joins are actually supported below.
38+
// RIGHT/FULL/CROSS would silently fall through to INNER logic and
39+
// return wrong results. Reject explicitly. (CROSS also doesn't
40+
// belong here because it's not an equi-join; PlanExecutor's
41+
// build_join should never route it to the hash join operator.)
42+
if (join_type_ != JOIN_INNER && join_type_ != JOIN_LEFT) {
43+
throw std::runtime_error(
44+
"join type not supported by HashJoinOperator "
45+
"(only INNER and LEFT equi-joins are implemented; "
46+
"RIGHT, FULL and CROSS joins are not yet supported)");
47+
}
48+
3649
// Build phase: consume right side into hash table keyed by join column
3750
hash_table_.clear();
3851
right_->open();

include/sql_engine/operators/join_op.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include "sql_engine/catalog.h"
77
#include "sql_engine/plan_node.h"
88
#include "sql_parser/arena.h"
9+
#include <stdexcept>
910
#include <functional>
1011
#include <vector>
1112

@@ -29,6 +30,17 @@ class NestedLoopJoinOperator : public Operator {
2930
functions_(functions), arena_(arena) {}
3031

3132
void open() override {
33+
// Only INNER, LEFT and CROSS joins are actually implemented below.
34+
// RIGHT/FULL would previously fall through to the INNER code path,
35+
// silently producing wrong results. Reject them explicitly instead
36+
// of corrupting query answers.
37+
if (join_type_ != JOIN_INNER && join_type_ != JOIN_LEFT && join_type_ != JOIN_CROSS) {
38+
throw std::runtime_error(
39+
"join type not supported by NestedLoopJoinOperator "
40+
"(only INNER, LEFT, CROSS are implemented; "
41+
"RIGHT and FULL joins are not yet supported)");
42+
}
43+
3244
// Materialize right side
3345
right_->open();
3446
right_rows_.clear();

include/sql_engine/operators/set_op_op.h

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <string>
99
#include <vector>
1010
#include <future>
11+
#include <stdexcept>
1112

1213
namespace sql_engine {
1314

@@ -37,12 +38,14 @@ class SetOpOperator : public Operator {
3738
}
3839
reading_left_ = true;
3940
seen_.clear();
41+
expected_col_count_ = -1;
4042

4143
if (op_ == SET_OP_INTERSECT || op_ == SET_OP_EXCEPT) {
4244
// Materialize right side into a set
4345
right_set_.clear();
4446
Row r{};
4547
while (right_->next(r)) {
48+
check_col_count(r);
4649
right_set_.insert(row_key(r));
4750
}
4851
right_->close();
@@ -63,6 +66,7 @@ class SetOpOperator : public Operator {
6366
if (!got) return false;
6467
}
6568
if (got) {
69+
check_col_count(out);
6670
std::string key = row_key(out);
6771
if (seen_.insert(key).second) return true;
6872
}
@@ -72,15 +76,23 @@ class SetOpOperator : public Operator {
7276
if (op_ == SET_OP_UNION && all_) {
7377
// UNION ALL: yield left then right
7478
if (reading_left_) {
75-
if (left_->next(out)) return true;
79+
if (left_->next(out)) {
80+
check_col_count(out);
81+
return true;
82+
}
7683
reading_left_ = false;
7784
}
78-
return right_->next(out);
85+
if (right_->next(out)) {
86+
check_col_count(out);
87+
return true;
88+
}
89+
return false;
7990
}
8091

8192
if (op_ == SET_OP_INTERSECT) {
8293
// Yield left rows that also appear in right
8394
while (left_->next(out)) {
95+
check_col_count(out);
8496
std::string key = row_key(out);
8597
if (right_set_.count(key)) {
8698
if (all_ || seen_.insert(key).second)
@@ -93,6 +105,7 @@ class SetOpOperator : public Operator {
93105
if (op_ == SET_OP_EXCEPT) {
94106
// Yield left rows that don't appear in right
95107
while (left_->next(out)) {
108+
check_col_count(out);
96109
std::string key = row_key(out);
97110
if (!right_set_.count(key)) {
98111
if (all_ || seen_.insert(key).second)
@@ -122,6 +135,26 @@ class SetOpOperator : public Operator {
122135
bool reading_left_ = true;
123136
std::unordered_set<std::string> seen_;
124137
std::unordered_set<std::string> right_set_;
138+
// Column count established by the first row we see. Used to detect
139+
// schema mismatches between the two sides of a UNION/INTERSECT/EXCEPT --
140+
// which the SQL standard says must have the same number of columns.
141+
// Previously, mismatched column counts silently produced wrong results
142+
// because row_key() would iterate a different number of values on each
143+
// side. Now we throw a clear error at execution time.
144+
int16_t expected_col_count_ = -1;
145+
146+
void check_col_count(const Row& row) {
147+
if (expected_col_count_ < 0) {
148+
expected_col_count_ = static_cast<int16_t>(row.column_count);
149+
return;
150+
}
151+
if (row.column_count != static_cast<uint16_t>(expected_col_count_)) {
152+
throw std::runtime_error(
153+
"set operation column count mismatch: expected " +
154+
std::to_string(expected_col_count_) + " columns, got " +
155+
std::to_string(row.column_count));
156+
}
157+
}
125158

126159
static std::string row_key(const Row& row) {
127160
std::string key;

include/sql_engine/session.h

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <cstring>
2020
#include <string>
2121
#include <unordered_map>
22+
#include <list>
2223
#include <memory>
2324

2425
namespace sql_engine {
@@ -72,17 +73,25 @@ class Session {
7273
pool_ = pool;
7374
}
7475

76+
// Configure the maximum number of cached plans. When the cache is full,
77+
// the least-recently-used entry is evicted on insert. Default is 1024.
78+
// Setting to 0 disables caching entirely.
79+
void set_plan_cache_max_size(size_t n) { plan_cache_max_size_ = n; }
80+
size_t plan_cache_size() const { return plan_cache_.size(); }
81+
7582
// Execute a SELECT query. Returns a ResultSet.
7683
// Uses plan caching: repeated identical SQL strings skip parse/plan/optimize/distribute.
7784
ResultSet execute_query(const char* sql, size_t len) {
7885
// Check plan cache first
7986
std::string sql_key(sql, len);
8087
auto cache_it = plan_cache_.find(sql_key);
8188
if (cache_it != plan_cache_.end()) {
82-
// Cache hit: reuse the cached plan. Use exec_arena_ for per-query
83-
// allocations (rows, operator internals). Reset it each time.
89+
// Cache hit: move this entry to the front of the LRU list.
90+
plan_cache_order_.splice(plan_cache_order_.begin(),
91+
plan_cache_order_,
92+
cache_it->second);
8493
exec_arena_.reset();
85-
auto& entry = cache_it->second;
94+
auto& entry = *cache_it->second;
8695
PlanExecutor<D> executor(functions_, catalog_, exec_arena_);
8796
wire_executor(executor);
8897
return executor.execute(entry.plan);
@@ -115,11 +124,10 @@ class Session {
115124
wire_executor(executor);
116125
ResultSet rs = executor.execute(plan);
117126

118-
// Cache the plan (parser arena keeps plan tree and strings alive)
119-
CachedPlan entry;
120-
entry.parser = std::move(cached_parser);
121-
entry.plan = plan;
122-
plan_cache_.emplace(std::move(sql_key), std::move(entry));
127+
// Cache the plan, enforcing the LRU bound. The parser arena is kept
128+
// alive via unique_ptr stored in the CachedPlan so all plan/AST
129+
// string pointers remain valid for subsequent cache hits.
130+
insert_into_plan_cache(std::move(sql_key), std::move(cached_parser), plan);
123131

124132
return rs;
125133
}
@@ -246,13 +254,45 @@ class Session {
246254
// Externally owned; set via set_thread_pool(). Shared across sessions.
247255
ThreadPool* pool_ = nullptr;
248256

249-
// Plan cache: maps SQL string → cached plan + parser arena.
250-
// The parser's arena keeps all AST/plan string pointers valid.
257+
// Plan cache: bounded LRU keyed by SQL string. Each entry owns the
258+
// parser whose arena keeps the plan tree and all AST/plan string
259+
// pointers alive for as long as the entry stays in the cache.
260+
//
261+
// Implementation: a list keeps insertion/use order (front = most
262+
// recently used) and a hash map maps each SQL string to its iterator
263+
// in the list, giving O(1) lookup, O(1) move-to-front on hit, and
264+
// O(1) eviction of the LRU entry on insert.
251265
struct CachedPlan {
266+
std::string key;
252267
std::unique_ptr<sql_parser::Parser<D>> parser;
253268
PlanNode* plan;
254269
};
255-
std::unordered_map<std::string, CachedPlan> plan_cache_;
270+
using CacheList = std::list<CachedPlan>;
271+
using CacheIter = typename CacheList::iterator;
272+
CacheList plan_cache_order_;
273+
std::unordered_map<std::string, CacheIter> plan_cache_;
274+
size_t plan_cache_max_size_ = 1024;
275+
276+
void insert_into_plan_cache(std::string key,
277+
std::unique_ptr<sql_parser::Parser<D>> parser,
278+
PlanNode* plan) {
279+
if (plan_cache_max_size_ == 0) return; // caching disabled
280+
// Evict LRU entries until we're within budget. We evict before insert
281+
// so the new entry counts against the cap and we never exceed it.
282+
while (plan_cache_.size() >= plan_cache_max_size_ && !plan_cache_order_.empty()) {
283+
const std::string& victim_key = plan_cache_order_.back().key;
284+
plan_cache_.erase(victim_key);
285+
plan_cache_order_.pop_back();
286+
}
287+
CachedPlan entry;
288+
entry.key = std::move(key);
289+
entry.parser = std::move(parser);
290+
entry.plan = plan;
291+
plan_cache_order_.push_front(std::move(entry));
292+
// The map stores the iterator and a copy of the key (so the lookup
293+
// string and the entry's owned key both remain valid through moves).
294+
plan_cache_[plan_cache_order_.front().key] = plan_cache_order_.begin();
295+
}
256296

257297
void wire_executor(PlanExecutor<D>& executor) {
258298
for (auto& kv : sources_)

include/sql_engine/thread_pool.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@ class ThreadPool {
3838
task = std::move(tasks_.front());
3939
tasks_.pop();
4040
}
41+
// Exception safety: submit() wraps callables in
42+
// std::packaged_task, which catches any exception and
43+
// stores it in the shared state to be rethrown by
44+
// future::get(). `task()` therefore does not throw, so
45+
// the worker loop cannot die on a user exception. If
46+
// submit() is ever changed to queue raw callables, this
47+
// must be wrapped in try { task(); } catch (...) { log; }.
4148
task();
4249
}
4350
});

0 commit comments

Comments
 (0)