Skip to content

Commit a69ded3

Browse files
committed
fix: remove arena from remote executors, use heap-owned ResultSet data (#33)
Remote executors (MySQL, PostgreSQL, Multi) no longer take an Arena& parameter. Instead, ResultSet owns its row value arrays and string data on the heap via owned_value_arrays and owned_strings, eliminating the arena memory leak under sustained load.
1 parent 2c3fbfc commit a69ded3

14 files changed

Lines changed: 92 additions & 61 deletions

include/sql_engine/multi_remote_executor.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
#include "sql_engine/mysql_remote_executor.h"
66
#include "sql_engine/pgsql_remote_executor.h"
77
#include "sql_engine/backend_config.h"
8-
#include "sql_parser/arena.h"
98
#include "sql_parser/common.h"
109

1110
#include <unordered_map>
@@ -15,7 +14,7 @@ namespace sql_engine {
1514

1615
class MultiRemoteExecutor : public RemoteExecutor {
1716
public:
18-
explicit MultiRemoteExecutor(sql_parser::Arena& arena);
17+
MultiRemoteExecutor();
1918
~MultiRemoteExecutor() override;
2019

2120
void add_backend(const BackendConfig& config);

include/sql_engine/mysql_remote_executor.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
#include "sql_engine/backend_config.h"
66
#include "sql_engine/value.h"
77
#include "sql_engine/row.h"
8-
#include "sql_parser/arena.h"
8+
#include "sql_engine/result_set.h"
99

1010
#include <mysql/mysql.h>
1111
#include <unordered_map>
@@ -15,7 +15,7 @@ namespace sql_engine {
1515

1616
class MySQLRemoteExecutor : public RemoteExecutor {
1717
public:
18-
explicit MySQLRemoteExecutor(sql_parser::Arena& arena);
18+
MySQLRemoteExecutor();
1919
~MySQLRemoteExecutor() override;
2020

2121
void add_backend(const BackendConfig& config);
@@ -31,11 +31,11 @@ class MySQLRemoteExecutor : public RemoteExecutor {
3131
};
3232

3333
std::unordered_map<std::string, Connection> backends_;
34-
sql_parser::Arena& arena_;
3534

3635
Connection& get_or_connect(const std::string& name);
3736
ResultSet mysql_result_to_resultset(MYSQL_RES* res);
38-
Value mysql_field_to_value(const char* data, unsigned long length,
37+
Value mysql_field_to_value(ResultSet& rs, const char* data,
38+
unsigned long length,
3939
enum_field_types type, bool is_null);
4040
};
4141

include/sql_engine/pgsql_remote_executor.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
#include "sql_engine/backend_config.h"
66
#include "sql_engine/value.h"
77
#include "sql_engine/row.h"
8-
#include "sql_parser/arena.h"
8+
#include "sql_engine/result_set.h"
99

1010
#include <libpq-fe.h>
1111
#include <unordered_map>
@@ -15,7 +15,7 @@ namespace sql_engine {
1515

1616
class PgSQLRemoteExecutor : public RemoteExecutor {
1717
public:
18-
explicit PgSQLRemoteExecutor(sql_parser::Arena& arena);
18+
PgSQLRemoteExecutor();
1919
~PgSQLRemoteExecutor() override;
2020

2121
void add_backend(const BackendConfig& config);
@@ -31,11 +31,11 @@ class PgSQLRemoteExecutor : public RemoteExecutor {
3131
};
3232

3333
std::unordered_map<std::string, Connection> backends_;
34-
sql_parser::Arena& arena_;
3534

3635
Connection& get_or_connect(const std::string& name);
3736
ResultSet pg_result_to_resultset(PGresult* res);
38-
Value pg_field_to_value(const char* data, int length, Oid type, bool is_null);
37+
Value pg_field_to_value(ResultSet& rs, const char* data, int length,
38+
Oid type, bool is_null);
3939
};
4040

4141
} // namespace sql_engine

include/sql_engine/result_set.h

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
#include "sql_engine/row.h"
55
#include <vector>
66
#include <string>
7+
#include <algorithm>
8+
#include <utility>
79

810
namespace sql_engine {
911

@@ -12,8 +14,62 @@ struct ResultSet {
1214
std::vector<std::string> column_names;
1315
uint16_t column_count = 0;
1416

17+
// Heap storage for row values and string data owned by this ResultSet.
18+
// Used by remote executors to avoid arena lifetime issues.
19+
std::vector<Value*> owned_value_arrays;
20+
std::vector<std::string> owned_strings;
21+
22+
ResultSet() = default;
23+
~ResultSet() {
24+
for (auto* arr : owned_value_arrays) ::operator delete(arr);
25+
}
26+
27+
// Move-only (owns heap arrays)
28+
ResultSet(ResultSet&& o) noexcept
29+
: rows(std::move(o.rows)),
30+
column_names(std::move(o.column_names)),
31+
column_count(o.column_count),
32+
owned_value_arrays(std::move(o.owned_value_arrays)),
33+
owned_strings(std::move(o.owned_strings)) {
34+
o.column_count = 0;
35+
}
36+
37+
ResultSet& operator=(ResultSet&& o) noexcept {
38+
if (this != &o) {
39+
for (auto* arr : owned_value_arrays) ::operator delete(arr);
40+
rows = std::move(o.rows);
41+
column_names = std::move(o.column_names);
42+
column_count = o.column_count;
43+
owned_value_arrays = std::move(o.owned_value_arrays);
44+
owned_strings = std::move(o.owned_strings);
45+
o.column_count = 0;
46+
}
47+
return *this;
48+
}
49+
50+
// Disable copy to prevent double-free of owned arrays
51+
ResultSet(const ResultSet&) = delete;
52+
ResultSet& operator=(const ResultSet&) = delete;
53+
1554
size_t row_count() const { return rows.size(); }
1655
bool empty() const { return rows.empty(); }
56+
57+
// Allocate a heap-owned row and append it to rows. Returns a reference
58+
// to the Row (which points into owned_value_arrays).
59+
Row& add_heap_row(uint16_t col_count) {
60+
Value* vals = static_cast<Value*>(::operator new(sizeof(Value) * col_count));
61+
for (uint16_t i = 0; i < col_count; ++i) vals[i] = value_null();
62+
owned_value_arrays.push_back(vals);
63+
rows.push_back(Row{vals, col_count});
64+
return rows.back();
65+
}
66+
67+
// Store a string in owned_strings and return a StringRef pointing into it.
68+
sql_parser::StringRef own_string(const char* data, uint32_t len) {
69+
owned_strings.emplace_back(data, len);
70+
const std::string& s = owned_strings.back();
71+
return sql_parser::StringRef{s.c_str(), static_cast<uint32_t>(s.size())};
72+
}
1773
};
1874

1975
} // namespace sql_engine

src/sql_engine/multi_remote_executor.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22

33
namespace sql_engine {
44

5-
MultiRemoteExecutor::MultiRemoteExecutor(sql_parser::Arena& arena)
6-
: mysql_exec_(arena), pgsql_exec_(arena) {}
5+
MultiRemoteExecutor::MultiRemoteExecutor() {}
76

87
MultiRemoteExecutor::~MultiRemoteExecutor() {
98
disconnect_all();

src/sql_engine/mysql_remote_executor.cpp

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@
66

77
namespace sql_engine {
88

9-
MySQLRemoteExecutor::MySQLRemoteExecutor(sql_parser::Arena& arena)
10-
: arena_(arena) {}
9+
MySQLRemoteExecutor::MySQLRemoteExecutor() {}
1110

1211
MySQLRemoteExecutor::~MySQLRemoteExecutor() {
1312
disconnect_all();
@@ -131,21 +130,20 @@ ResultSet MySQLRemoteExecutor::mysql_result_to_resultset(MYSQL_RES* res) {
131130
MYSQL_ROW mysql_row;
132131
while ((mysql_row = mysql_fetch_row(res)) != nullptr) {
133132
unsigned long* lengths = mysql_fetch_lengths(res);
134-
Row row = make_row(arena_, rs.column_count);
133+
Row& row = rs.add_heap_row(rs.column_count);
135134
for (unsigned int i = 0; i < num_fields; ++i) {
136135
bool is_null = (mysql_row[i] == nullptr);
137136
Value v = mysql_field_to_value(
138-
mysql_row[i], is_null ? 0 : lengths[i],
137+
rs, mysql_row[i], is_null ? 0 : lengths[i],
139138
fields[i].type, is_null);
140139
row.set(static_cast<uint16_t>(i), v);
141140
}
142-
rs.rows.push_back(row);
143141
}
144142
return rs;
145143
}
146144

147145
Value MySQLRemoteExecutor::mysql_field_to_value(
148-
const char* data, unsigned long length,
146+
ResultSet& rs, const char* data, unsigned long length,
149147
enum_field_types type, bool is_null) {
150148

151149
if (is_null) return value_null();
@@ -165,8 +163,7 @@ Value MySQLRemoteExecutor::mysql_field_to_value(
165163

166164
case MYSQL_TYPE_DECIMAL:
167165
case MYSQL_TYPE_NEWDECIMAL: {
168-
// Store as decimal (numeric string) in arena
169-
sql_parser::StringRef s = arena_.allocate_string(data, static_cast<uint32_t>(length));
166+
sql_parser::StringRef s = rs.own_string(data, static_cast<uint32_t>(length));
170167
return value_decimal(s);
171168
}
172169

@@ -197,18 +194,18 @@ Value MySQLRemoteExecutor::mysql_field_to_value(
197194
case MYSQL_TYPE_TINY_BLOB:
198195
case MYSQL_TYPE_MEDIUM_BLOB:
199196
case MYSQL_TYPE_LONG_BLOB: {
200-
sql_parser::StringRef s = arena_.allocate_string(data, static_cast<uint32_t>(length));
197+
sql_parser::StringRef s = rs.own_string(data, static_cast<uint32_t>(length));
201198
return value_bytes(s);
202199
}
203200

204201
case MYSQL_TYPE_JSON: {
205-
sql_parser::StringRef s = arena_.allocate_string(data, static_cast<uint32_t>(length));
202+
sql_parser::StringRef s = rs.own_string(data, static_cast<uint32_t>(length));
206203
return value_json(s);
207204
}
208205

209206
default: {
210207
// STRING, VAR_STRING, ENUM, SET, etc. — treat as string
211-
sql_parser::StringRef s = arena_.allocate_string(data, static_cast<uint32_t>(length));
208+
sql_parser::StringRef s = rs.own_string(data, static_cast<uint32_t>(length));
212209
return value_string(s);
213210
}
214211
}

src/sql_engine/pgsql_remote_executor.cpp

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@
2929

3030
namespace sql_engine {
3131

32-
PgSQLRemoteExecutor::PgSQLRemoteExecutor(sql_parser::Arena& arena)
33-
: arena_(arena) {}
32+
PgSQLRemoteExecutor::PgSQLRemoteExecutor() {}
3433

3534
PgSQLRemoteExecutor::~PgSQLRemoteExecutor() {
3635
disconnect_all();
@@ -152,22 +151,21 @@ ResultSet PgSQLRemoteExecutor::pg_result_to_resultset(PGresult* res) {
152151
}
153152

154153
for (int r = 0; r < num_rows; ++r) {
155-
Row row = make_row(arena_, rs.column_count);
154+
Row& row = rs.add_heap_row(rs.column_count);
156155
for (int c = 0; c < num_fields; ++c) {
157156
bool is_null = PQgetisnull(res, r, c);
158157
Oid oid = PQftype(res, c);
159158
const char* data = PQgetvalue(res, r, c);
160159
int length = PQgetlength(res, r, c);
161-
Value v = pg_field_to_value(data, length, oid, is_null);
160+
Value v = pg_field_to_value(rs, data, length, oid, is_null);
162161
row.set(static_cast<uint16_t>(c), v);
163162
}
164-
rs.rows.push_back(row);
165163
}
166164
return rs;
167165
}
168166

169167
Value PgSQLRemoteExecutor::pg_field_to_value(
170-
const char* data, int length, Oid type, bool is_null) {
168+
ResultSet& rs, const char* data, int length, Oid type, bool is_null) {
171169

172170
if (is_null) return value_null();
173171

@@ -186,7 +184,7 @@ Value PgSQLRemoteExecutor::pg_field_to_value(
186184
return value_double(std::strtod(data, nullptr));
187185

188186
case NUMERICOID: {
189-
sql_parser::StringRef s = arena_.allocate_string(data, static_cast<uint32_t>(length));
187+
sql_parser::StringRef s = rs.own_string(data, static_cast<uint32_t>(length));
190188
return value_string(s);
191189
}
192190

@@ -213,19 +211,19 @@ Value PgSQLRemoteExecutor::pg_field_to_value(
213211
}
214212

215213
case BYTEAOID: {
216-
sql_parser::StringRef s = arena_.allocate_string(data, static_cast<uint32_t>(length));
214+
sql_parser::StringRef s = rs.own_string(data, static_cast<uint32_t>(length));
217215
return value_bytes(s);
218216
}
219217

220218
case JSONOID:
221219
case JSONBOID: {
222-
sql_parser::StringRef s = arena_.allocate_string(data, static_cast<uint32_t>(length));
220+
sql_parser::StringRef s = rs.own_string(data, static_cast<uint32_t>(length));
223221
return value_json(s);
224222
}
225223

226224
default: {
227225
// TEXT, VARCHAR, BPCHAR, NAME, and everything else -- treat as string
228-
sql_parser::StringRef s = arena_.allocate_string(data, static_cast<uint32_t>(length));
226+
sql_parser::StringRef s = rs.own_string(data, static_cast<uint32_t>(length));
229227
return value_string(s);
230228
}
231229
}

tests/test_distributed_real.cpp

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,7 @@ void register_test_catalog(InMemoryCatalog& catalog) {
7777
class DistributedRealMySQLTest : public ::testing::Test {
7878
protected:
7979
void SetUp() override {
80-
exec_arena_ = std::make_unique<Arena>(65536, 1048576);
81-
exec_ = std::make_unique<MySQLRemoteExecutor>(*exec_arena_);
80+
exec_ = std::make_unique<MySQLRemoteExecutor>();
8281

8382
BackendConfig cfg;
8483
cfg.name = "shard_1";
@@ -129,7 +128,6 @@ class DistributedRealMySQLTest : public ::testing::Test {
129128
return executor.execute(dist_plan);
130129
}
131130

132-
std::unique_ptr<Arena> exec_arena_;
133131
std::unique_ptr<MySQLRemoteExecutor> exec_;
134132
InMemoryCatalog catalog_;
135133
FunctionRegistry<Dialect::MySQL> functions_;
@@ -167,15 +165,13 @@ TEST_F(DistributedRealMySQLTest, SelectFromOrders) {
167165
class MultiExecutorTest : public ::testing::Test {
168166
protected:
169167
void SetUp() override {
170-
arena_ = std::make_unique<Arena>(65536, 1048576);
171-
exec_ = std::make_unique<MultiRemoteExecutor>(*arena_);
168+
exec_ = std::make_unique<MultiRemoteExecutor>();
172169
}
173170

174171
void TearDown() override {
175172
exec_->disconnect_all();
176173
}
177174

178-
std::unique_ptr<Arena> arena_;
179175
std::unique_ptr<MultiRemoteExecutor> exec_;
180176
};
181177

tests/test_mysql_executor.cpp

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#include <gtest/gtest.h>
22
#include "sql_engine/mysql_remote_executor.h"
33
#include "sql_engine/backend_config.h"
4-
#include "sql_parser/arena.h"
54

65
#include <mysql/mysql.h>
76

@@ -41,18 +40,15 @@ sql_engine::BackendConfig make_mysql_config(const char* name = "test_mysql") {
4140
class MySQLExecutorTest : public ::testing::Test {
4241
protected:
4342
void SetUp() override {
44-
arena_ = std::make_unique<sql_parser::Arena>(65536, 1048576);
45-
exec_ = std::make_unique<sql_engine::MySQLRemoteExecutor>(*arena_);
43+
exec_ = std::make_unique<sql_engine::MySQLRemoteExecutor>();
4644
exec_->add_backend(make_mysql_config());
4745
}
4846

4947
void TearDown() override {
5048
exec_->disconnect_all();
5149
exec_.reset();
52-
arena_.reset();
5350
}
5451

55-
std::unique_ptr<sql_parser::Arena> arena_;
5652
std::unique_ptr<sql_engine::MySQLRemoteExecutor> exec_;
5753
};
5854

tests/test_pgsql_executor.cpp

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#include <gtest/gtest.h>
22
#include "sql_engine/pgsql_remote_executor.h"
33
#include "sql_engine/backend_config.h"
4-
#include "sql_parser/arena.h"
54

65
#include <libpq-fe.h>
76

@@ -43,18 +42,15 @@ sql_engine::BackendConfig make_pgsql_config(const char* name = "test_pgsql") {
4342
class PgSQLExecutorTest : public ::testing::Test {
4443
protected:
4544
void SetUp() override {
46-
arena_ = std::make_unique<sql_parser::Arena>(65536, 1048576);
47-
exec_ = std::make_unique<sql_engine::PgSQLRemoteExecutor>(*arena_);
45+
exec_ = std::make_unique<sql_engine::PgSQLRemoteExecutor>();
4846
exec_->add_backend(make_pgsql_config());
4947
}
5048

5149
void TearDown() override {
5250
exec_->disconnect_all();
5351
exec_.reset();
54-
arena_.reset();
5552
}
5653

57-
std::unique_ptr<sql_parser::Arena> arena_;
5854
std::unique_ptr<sql_engine::PgSQLRemoteExecutor> exec_;
5955
};
6056

0 commit comments

Comments
 (0)