Skip to content

Commit f820e73

Browse files
committed
perf: thread pool for parallel shard I/O + plan caching + connection pool fixes
Three performance optimizations to improve distributed query throughput: 1. Thread pool (thread_pool.h): Pre-spawned worker threads replace std::async for parallel shard dispatch. ~1-2us dispatch overhead vs ~200us for thread creation. Shared across all sessions in the stress test. 2. Plan caching (session.h): Repeated identical SQL strings skip the full parse -> plan -> optimize -> distribute pipeline. Cached plans reuse the parser's arena for plan tree validity across calls. 3. Connection pool improvements: - Removed mysql_ping() on checkout (was adding ~200us RTT per checkout) - Per-backend mutexes instead of global lock (eliminates cross-shard contention) - Direct mysql_real_query with StringRef (avoids std::string copy) Results vs Vitess (peak QPS): - aggregation: 44.5K vs 34.5K (+29%, WIN) - sort_limit: 51.6K vs 41.5K (+24%, WIN) - count_star: 10.6K vs 47K (still behind -- bottleneck is MySQL connection overhead)
1 parent 937ffbd commit f820e73

9 files changed

Lines changed: 281 additions & 100 deletions

File tree

include/sql_engine/connection_pool.h

Lines changed: 45 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33

44
// Thread-safe connection pool for MySQL backends.
55
//
6-
// Each backend name maps to a stack of idle MYSQL* handles. Threads check out
7-
// a handle (creating one on demand), use it, and check it back in. The pool
8-
// grows dynamically -- there is no hard cap -- so N concurrent calls to the
9-
// same backend will use N connections.
6+
// Each backend name maps to a stack of idle MYSQL* handles with its own mutex.
7+
// Threads check out a handle (creating one on demand), use it, and check it
8+
// back in. Per-backend locking means concurrent queries to different backends
9+
// never contend on the same mutex.
1010

1111
#include "sql_engine/backend_config.h"
1212
#include <mysql/mysql.h>
@@ -15,6 +15,7 @@
1515
#include <vector>
1616
#include <string>
1717
#include <stdexcept>
18+
#include <memory>
1819

1920
namespace sql_engine {
2021

@@ -23,60 +24,67 @@ class ConnectionPool {
2324
ConnectionPool() = default;
2425

2526
~ConnectionPool() {
26-
std::lock_guard<std::mutex> lk(mu_);
27-
for (auto& kv : idle_) {
28-
for (MYSQL* c : kv.second) {
27+
for (auto& kv : backends_) {
28+
auto& be = *kv.second;
29+
std::lock_guard<std::mutex> lk(be.mu);
30+
for (MYSQL* c : be.idle) {
2931
if (c) mysql_close(c);
3032
}
3133
}
32-
idle_.clear();
3334
}
3435

3536
// Register a backend. Must be called before any checkout for that name.
3637
void add_backend(const BackendConfig& config) {
37-
std::lock_guard<std::mutex> lk(mu_);
38-
configs_[config.name] = config;
38+
auto be = std::make_unique<Backend>();
39+
be->config = config;
40+
backends_[config.name] = std::move(be);
3941
}
4042

41-
// Obtain a connection for the named backend. Creates a new one if none is
42-
// idle. The caller MUST call checkin() when done.
43+
// Obtain a connection for the named backend.
4344
MYSQL* checkout(const std::string& backend) {
44-
std::lock_guard<std::mutex> lk(mu_);
45-
auto& stack = idle_[backend];
46-
if (!stack.empty()) {
47-
MYSQL* c = stack.back();
48-
stack.pop_back();
49-
// Quick liveness check (non-blocking).
50-
if (mysql_ping(c) != 0) {
51-
mysql_close(c);
52-
return create_connection(backend);
45+
Backend& be = get_backend(backend);
46+
{
47+
std::lock_guard<std::mutex> lk(be.mu);
48+
if (!be.idle.empty()) {
49+
MYSQL* c = be.idle.back();
50+
be.idle.pop_back();
51+
return c;
5352
}
54-
return c;
5553
}
56-
return create_connection(backend);
54+
// Create connection outside the lock (mysql_real_connect blocks)
55+
return create_connection(be);
5756
}
5857

5958
// Return a connection to the pool for reuse.
6059
void checkin(const std::string& backend, MYSQL* conn) {
6160
if (!conn) return;
62-
std::lock_guard<std::mutex> lk(mu_);
63-
idle_[backend].push_back(conn);
61+
Backend& be = get_backend(backend);
62+
std::lock_guard<std::mutex> lk(be.mu);
63+
be.idle.push_back(conn);
6464
}
6565

6666
private:
67-
std::mutex mu_;
68-
std::unordered_map<std::string, BackendConfig> configs_;
69-
std::unordered_map<std::string, std::vector<MYSQL*>> idle_;
70-
71-
// Must be called with mu_ held.
72-
MYSQL* create_connection(const std::string& backend) {
73-
auto it = configs_.find(backend);
74-
if (it == configs_.end()) {
75-
throw std::runtime_error("ConnectionPool: unknown backend: " + backend);
67+
struct Backend {
68+
BackendConfig config;
69+
std::mutex mu;
70+
std::vector<MYSQL*> idle;
71+
};
72+
73+
std::unordered_map<std::string, std::unique_ptr<Backend>> backends_;
74+
75+
Backend& get_backend(const std::string& name) {
76+
auto it = backends_.find(name);
77+
if (it == backends_.end()) {
78+
throw std::runtime_error("ConnectionPool: unknown backend: " + name);
7679
}
77-
const BackendConfig& cfg = it->second;
80+
return *it->second;
81+
}
82+
83+
// Must NOT be called with be.mu held (mysql_real_connect blocks).
84+
static MYSQL* create_connection(Backend& be) {
85+
const BackendConfig& cfg = be.config;
7886
MYSQL* c = mysql_init(nullptr);
79-
if (!c) throw std::runtime_error("mysql_init failed for " + backend);
87+
if (!c) throw std::runtime_error("mysql_init failed for " + cfg.name);
8088

8189
unsigned int timeout = 5;
8290
mysql_options(c, MYSQL_OPT_CONNECT_TIMEOUT, &timeout);
@@ -86,7 +94,7 @@ class ConnectionPool {
8694
cfg.port, nullptr, 0)) {
8795
std::string err = mysql_error(c);
8896
mysql_close(c);
89-
throw std::runtime_error("ConnectionPool connect failed for " + backend + ": " + err);
97+
throw std::runtime_error("ConnectionPool connect failed for " + cfg.name + ": " + err);
9098
}
9199
mysql_set_character_set(c, "utf8mb4");
92100
return c;

include/sql_engine/operators/merge_aggregate_op.h

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "sql_engine/operator.h"
55
#include "sql_engine/value.h"
66
#include "sql_engine/row.h"
7+
#include "sql_engine/thread_pool.h"
78
#include "sql_parser/arena.h"
89
#include <vector>
910
#include <unordered_map>
@@ -32,12 +33,14 @@ class MergeAggregateOperator : public Operator {
3233
const uint8_t* merge_ops,
3334
uint16_t merge_op_count,
3435
sql_parser::Arena& arena,
35-
bool parallel_open = false)
36+
bool parallel_open = false,
37+
ThreadPool* pool = nullptr)
3638
: children_(std::move(children)),
3739
group_key_count_(group_key_count),
3840
merge_op_count_(merge_op_count),
3941
arena_(arena),
40-
parallel_open_(parallel_open)
42+
parallel_open_(parallel_open),
43+
pool_(pool)
4144
{
4245
merge_ops_.assign(merge_ops, merge_ops + merge_op_count);
4346
}
@@ -48,30 +51,27 @@ class MergeAggregateOperator : public Operator {
4851
result_idx_ = 0;
4952

5053
if (parallel_open_ && children_.size() > 1) {
51-
// Parallel execution (#26): open all children concurrently and
52-
// materialize their rows. Each child is typically a RemoteScan
53-
// that performs a network call in open(); launching concurrently
54-
// reduces wall-clock time from O(N*latency) to O(latency).
55-
// Each future fully opens+consumes its child so the remote
56-
// executor call is contained within one thread.
57-
//
58-
// NOTE: requires a thread-safe RemoteExecutor implementation
59-
// (production executors with independent per-backend connections
60-
// are safe; the unit-test mock may not be).
54+
// Parallel execution: open all children concurrently and
55+
// materialize their rows. Uses thread pool when available
56+
// (~1-2us dispatch) vs std::async (~200us thread creation).
6157
std::vector<std::vector<Row>> child_rows(children_.size());
6258
{
6359
std::vector<std::future<void>> futures;
6460
futures.reserve(children_.size());
6561
for (size_t ci = 0; ci < children_.size(); ++ci) {
66-
futures.push_back(std::async(std::launch::async,
67-
[this, ci, &child_rows]{
68-
children_[ci]->open();
69-
Row row{};
70-
while (children_[ci]->next(row)) {
71-
child_rows[ci].push_back(row);
72-
}
73-
children_[ci]->close();
74-
}));
62+
auto launcher = [this, ci, &child_rows]{
63+
children_[ci]->open();
64+
Row row{};
65+
while (children_[ci]->next(row)) {
66+
child_rows[ci].push_back(row);
67+
}
68+
children_[ci]->close();
69+
};
70+
if (pool_) {
71+
futures.push_back(pool_->submit(std::move(launcher)));
72+
} else {
73+
futures.push_back(std::async(std::launch::async, std::move(launcher)));
74+
}
7575
}
7676
for (auto& f : futures) f.get();
7777
}
@@ -155,6 +155,7 @@ class MergeAggregateOperator : public Operator {
155155
uint16_t merge_op_count_;
156156
sql_parser::Arena& arena_;
157157
bool parallel_open_;
158+
ThreadPool* pool_ = nullptr;
158159

159160
struct GroupState {
160161
std::vector<Value> group_values;

include/sql_engine/operators/merge_sort_op.h

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "sql_engine/operator.h"
55
#include "sql_engine/value.h"
66
#include "sql_engine/row.h"
7+
#include "sql_engine/thread_pool.h"
78
#include <vector>
89
#include <queue>
910
#include <functional>
@@ -21,9 +22,10 @@ class MergeSortOperator : public Operator {
2122
const uint16_t* sort_col_indices,
2223
const uint8_t* directions,
2324
uint16_t key_count,
24-
bool parallel_open = false)
25+
bool parallel_open = false,
26+
ThreadPool* pool = nullptr)
2527
: children_(std::move(children)), key_count_(key_count),
26-
parallel_open_(parallel_open)
28+
parallel_open_(parallel_open), pool_(pool)
2729
{
2830
sort_cols_.assign(sort_col_indices, sort_col_indices + key_count);
2931
directions_.assign(directions, directions + key_count);
@@ -34,24 +36,25 @@ class MergeSortOperator : public Operator {
3436
has_row_.assign(children_.size(), false);
3537

3638
if (parallel_open_ && children_.size() > 1) {
37-
// Parallel execution (#26): open all children concurrently and
38-
// fetch their first row. Each child is typically a RemoteScan
39-
// that performs a network call in open(); launching concurrently
40-
// reduces wall-clock time from O(N*latency) to O(latency).
41-
//
42-
// NOTE: requires a thread-safe RemoteExecutor implementation.
39+
// Parallel execution: open all children concurrently and
40+
// fetch their first row. Uses thread pool when available
41+
// (~1-2us dispatch) vs std::async (~200us thread creation).
4342
std::vector<std::future<void>> futures;
4443
futures.reserve(children_.size());
4544
for (size_t i = 0; i < children_.size(); ++i) {
46-
futures.push_back(std::async(std::launch::async,
47-
[this, i]{
48-
children_[i]->open();
49-
Row row{};
50-
if (children_[i]->next(row)) {
51-
heads_[i] = row;
52-
has_row_[i] = true;
53-
}
54-
}));
45+
auto launcher = [this, i]{
46+
children_[i]->open();
47+
Row row{};
48+
if (children_[i]->next(row)) {
49+
heads_[i] = row;
50+
has_row_[i] = true;
51+
}
52+
};
53+
if (pool_) {
54+
futures.push_back(pool_->submit(std::move(launcher)));
55+
} else {
56+
futures.push_back(std::async(std::launch::async, std::move(launcher)));
57+
}
5558
}
5659
for (auto& f : futures) f.get();
5760
} else {
@@ -115,6 +118,7 @@ class MergeSortOperator : public Operator {
115118
std::vector<uint8_t> directions_;
116119
uint16_t key_count_;
117120
bool parallel_open_;
121+
ThreadPool* pool_ = nullptr;
118122

119123
std::vector<Row> heads_;
120124
std::vector<bool> has_row_;

include/sql_engine/operators/set_op_op.h

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
#include "sql_engine/operator.h"
55
#include "sql_engine/plan_node.h"
6+
#include "sql_engine/thread_pool.h"
67
#include <unordered_set>
78
#include <string>
89
#include <vector>
@@ -13,15 +14,19 @@ namespace sql_engine {
1314
class SetOpOperator : public Operator {
1415
public:
1516
SetOpOperator(Operator* left, Operator* right, uint8_t op, bool all,
16-
bool parallel_open = false)
17+
bool parallel_open = false, ThreadPool* pool = nullptr)
1718
: left_(left), right_(right), op_(op), all_(all),
18-
parallel_open_(parallel_open) {}
19+
parallel_open_(parallel_open), pool_(pool) {}
1920

2021
void open() override {
21-
if (parallel_open_) {
22-
// Parallel open (#26): launch left and right opens concurrently.
23-
// This is beneficial when both children are RemoteScan operators
24-
// (each performing a network round-trip in open()).
22+
if (parallel_open_ && pool_) {
23+
// Thread-pool parallel open: ~1-2us dispatch vs ~200us for std::async
24+
auto fl = pool_->submit([this]{ left_->open(); });
25+
auto fr = pool_->submit([this]{ right_->open(); });
26+
fl.get();
27+
fr.get();
28+
} else if (parallel_open_) {
29+
// Fallback: std::async when no pool available
2530
auto fl = std::async(std::launch::async, [this]{ left_->open(); });
2631
auto fr = std::async(std::launch::async, [this]{ right_->open(); });
2732
fl.get();
@@ -113,6 +118,7 @@ class SetOpOperator : public Operator {
113118
uint8_t op_;
114119
bool all_;
115120
bool parallel_open_;
121+
ThreadPool* pool_ = nullptr;
116122
bool reading_left_ = true;
117123
std::unordered_set<std::string> seen_;
118124
std::unordered_set<std::string> right_set_;

include/sql_engine/plan_executor.h

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
#include "sql_engine/operators/merge_sort_op.h"
4747
#include "sql_engine/operators/window_op.h"
4848
#include "sql_engine/remote_executor.h"
49+
#include "sql_engine/thread_pool.h"
4950
#include "sql_engine/dml_result.h"
5051
#include "sql_engine/mutable_data_source.h"
5152
#include "sql_engine/catalog_resolver.h"
@@ -87,6 +88,12 @@ class PlanExecutor {
8788
parallel_open_enabled_ = enabled;
8889
}
8990

91+
// Set a thread pool for lightweight parallel dispatch in merge/set operators.
92+
// The pool must outlive this executor.
93+
void set_thread_pool(ThreadPool* pool) {
94+
pool_ = pool;
95+
}
96+
9097
// Access the subquery executor (for operators that need it)
9198
SubqueryExecutor<D>* subquery_executor() { return &subquery_exec_; }
9299

@@ -302,6 +309,7 @@ class PlanExecutor {
302309
std::vector<std::unique_ptr<Operator>> operators_;
303310
RemoteExecutor* remote_executor_ = nullptr;
304311
bool parallel_open_enabled_ = false;
312+
ThreadPool* pool_ = nullptr;
305313
DistributeFn distribute_fn_;
306314
SubqueryExecutor<D> subquery_exec_;
307315
sql_parser::Arena subquery_plan_arena_{65536, 1048576};
@@ -1024,7 +1032,8 @@ class PlanExecutor {
10241032
(node->left && node->left->type == PlanNodeType::REMOTE_SCAN &&
10251033
node->right && node->right->type == PlanNodeType::REMOTE_SCAN);
10261034
auto op = std::make_unique<SetOpOperator>(
1027-
left, right, node->set_op.op, node->set_op.all, parallel);
1035+
left, right, node->set_op.op, node->set_op.all, parallel,
1036+
parallel ? pool_ : nullptr);
10281037
Operator* ptr = op.get();
10291038
operators_.push_back(std::move(op));
10301039
return ptr;
@@ -1058,7 +1067,8 @@ class PlanExecutor {
10581067
node->merge_aggregate.merge_ops,
10591068
node->merge_aggregate.merge_op_count,
10601069
arena_,
1061-
parallel);
1070+
parallel,
1071+
parallel ? pool_ : nullptr);
10621072
Operator* ptr = op.get();
10631073
operators_.push_back(std::move(op));
10641074
return ptr;
@@ -1098,7 +1108,8 @@ class PlanExecutor {
10981108
sort_col_indices.data(),
10991109
sort_dirs.data(),
11001110
node->merge_sort.key_count,
1101-
parallel);
1111+
parallel,
1112+
parallel ? pool_ : nullptr);
11021113
Operator* ptr = op.get();
11031114
operators_.push_back(std::move(op));
11041115
return ptr;

0 commit comments

Comments
 (0)