Skip to content

Commit b36dcaa

Browse files
committed
feat: temporal functions + 2PC durable log + connection timeouts
Adds the two biggest gaps from the codebase review: temporal functions (NOW, EXTRACT equivalents, epoch conversion, DATEDIFF) and 2PC crash recovery via a write-ahead log. Also adds client-level read/write timeouts to all backend connections so a wedged backend can no longer hang the coordinator forever. 1094/1131 tests pass, 0 failures, 37 skipped (require Docker). +37 new tests (21 datetime, 11 WAL, 3 timeout, 2 misc). ----- Temporal functions ----- Zero temporal functions were previously registered. Any query referring to NOW(), CURRENT_DATE, YEAR(col), MONTH(col), UNIX_TIMESTAMP(), etc. failed -- a huge gap since a large fraction of real analytical queries use at least one of these. Registered for both MySQL and PostgreSQL dialects: - NOW, CURRENT_TIMESTAMP -> DATETIME (current wall time) - CURRENT_DATE, CURDATE -> DATE - CURRENT_TIME, CURTIME -> TIME - YEAR, MONTH, DAY, DAYOFMONTH -> INT64 component extractors - HOUR, MINUTE, SECOND -> INT64 component extractors - UNIX_TIMESTAMP (0 or 1 arg) -> INT64 seconds since epoch - FROM_UNIXTIME (1 arg) -> DATETIME - DATEDIFF (2 args) -> INT64 days between dates All extractors accept DATE, DATETIME, TIMESTAMP, or a parseable string on input; all return NULL for NULL inputs or unparseable strings. DATEDIFF uses floor division so negative differences round correctly across the day boundary. Also fixed sqlengine's CLI display formatter to use the datetime_parse format helpers, so temporal columns now render as "2026-04-10 14:52:21" instead of "DATETIME(1775832688494873)". ----- 2PC durable log (crash recovery) ----- The single most dangerous gap in the distributed layer: a crash between phase 1 (all participants PREPARE OK) and phase 2 (dispatch COMMIT) left prepared transactions stranded on every backend with no automatic recovery path. Fixed by a new minimal append-only WAL: include/sql_engine/durable_txn_log.h Format: one record per line, tab-separated: COMMIT\t<txn_id>\t<participant1>,<participant2>,...\n ROLLBACK\t<txn_id>\t<participant1>,<participant2>,...\n COMPLETE\t<txn_id>\n - log_decision() writes synchronously + fsync before phase 2 runs - log_complete() writes synchronously + fsync after phase 2 succeeds - scan_in_doubt() reads the log and returns every decision that has no matching COMPLETE -- those are the transactions a DBA (or future automatic recovery) needs to resolve DistributedTransactionManager grew set_durable_log() (optional -- a nullptr log disables WAL and preserves legacy behavior) and an important semantic fix: maybe_log_complete() is now called ONLY when phase 2 commit fully succeeded. A partial phase-2 commit leaves the transaction in-doubt in the log, which is the correct recovery invariant. Previously the code logged COMPLETE unconditionally, which would have lost the in-doubt state. Tests include the full commit happy path, phase-2 failure leaving in-doubt, phase-1 failure recording ROLLBACK + COMPLETE, rollback path, persistence across close/reopen, and scan of empty/missing files. Limitations of the MVP (documented in the header): - Single log file, no rotation. Long-running coordinators will grow the file indefinitely; compaction is a follow-up. - Recovery resolution (actually contacting backends to finish the work) lives in scan_in_doubt's caller; this class just produces the work list. ----- Connection-level timeouts ----- The agent's review flagged "no query timeouts" as a 2PC hazard: a wedged backend during XA PREPARE could block the entire distributed transaction forever holding locks on every other shard. Added client- level read/write timeouts so libmysqlclient / libpq will fail the call after a bounded wait: connection_pool.h, mysql_remote_executor.cpp: MYSQL_OPT_CONNECT_TIMEOUT = 5s (was already set) MYSQL_OPT_READ_TIMEOUT = 30s (NEW) MYSQL_OPT_WRITE_TIMEOUT = 30s (NEW) pgsql_remote_executor.cpp: options='-c statement_timeout=30000' in conninfo (NEW) All three are overridable at compile time via SQL_ENGINE_MYSQL_{CONNECT,READ,WRITE}_TIMEOUT_SEC and SQL_ENGINE_PG_STATEMENT_TIMEOUT_MS macros. On top of the connection-level ceilings, DistributedTransactionManager now has set_phase_statement_timeout_ms() which issues an explicit SET SESSION max_execution_time (MySQL) or SET LOCAL statement_timeout (PostgreSQL) before each phase-1 / phase-2 SQL. This lets callers tighten the 2PC-specific timeout below the connection default without affecting other queries on the same backend. Best-effort on pooled connections (the SET may end up on a different physical connection than the next statement), but the connection-level timeout is the real ceiling. Known not-yet-fixed: - PostgreSQL PREPARE TRANSACTION connection-pinning issue (requires a RemoteExecutor API change to bind a transaction to a specific physical connection across calls; deferred). - Automatic in-doubt recovery on startup (the WAL reports the work, but nothing currently drives the XA COMMIT / XA ROLLBACK replay).
1 parent d71571c commit b36dcaa

11 files changed

Lines changed: 1255 additions & 14 deletions

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ TEST_SRCS = $(TEST_DIR)/test_main.cpp \
8181
$(TEST_DIR)/test_window.cpp \
8282
$(TEST_DIR)/test_cte.cpp \
8383
$(TEST_DIR)/test_datetime_format.cpp \
84+
$(TEST_DIR)/test_datetime_funcs.cpp \
8485
$(TEST_DIR)/test_result_set.cpp
8586
TEST_OBJS = $(TEST_SRCS:.cpp=.o)
8687
TEST_TARGET = $(PROJECT_ROOT)/run_tests

include/sql_engine/connection_pool.h

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,25 @@
1919

2020
namespace sql_engine {
2121

22+
// Default timeouts (seconds). These bound how long any single libmysqlclient
23+
// read or write can block. Without them, a wedged backend or a network
24+
// partition during XA PREPARE would hang the entire 2PC coordinator
25+
// indefinitely. Set generously enough that healthy OLTP queries never hit
26+
// them (30s is well above normal p99 latency on any remotely sane backend).
27+
//
28+
// These are set at mysql_options time, so they apply to the current
29+
// connection only; a different pooled connection on the same backend gets
30+
// the same default.
31+
#ifndef SQL_ENGINE_MYSQL_CONNECT_TIMEOUT_SEC
32+
#define SQL_ENGINE_MYSQL_CONNECT_TIMEOUT_SEC 5
33+
#endif
34+
#ifndef SQL_ENGINE_MYSQL_READ_TIMEOUT_SEC
35+
#define SQL_ENGINE_MYSQL_READ_TIMEOUT_SEC 30
36+
#endif
37+
#ifndef SQL_ENGINE_MYSQL_WRITE_TIMEOUT_SEC
38+
#define SQL_ENGINE_MYSQL_WRITE_TIMEOUT_SEC 30
39+
#endif
40+
2241
class ConnectionPool {
2342
public:
2443
ConnectionPool() = default;
@@ -86,8 +105,12 @@ class ConnectionPool {
86105
MYSQL* c = mysql_init(nullptr);
87106
if (!c) throw std::runtime_error("mysql_init failed for " + cfg.name);
88107

89-
unsigned int timeout = 5;
90-
mysql_options(c, MYSQL_OPT_CONNECT_TIMEOUT, &timeout);
108+
unsigned int connect_timeout = SQL_ENGINE_MYSQL_CONNECT_TIMEOUT_SEC;
109+
unsigned int read_timeout = SQL_ENGINE_MYSQL_READ_TIMEOUT_SEC;
110+
unsigned int write_timeout = SQL_ENGINE_MYSQL_WRITE_TIMEOUT_SEC;
111+
mysql_options(c, MYSQL_OPT_CONNECT_TIMEOUT, &connect_timeout);
112+
mysql_options(c, MYSQL_OPT_READ_TIMEOUT, &read_timeout);
113+
mysql_options(c, MYSQL_OPT_WRITE_TIMEOUT, &write_timeout);
91114

92115
if (!mysql_real_connect(c, cfg.host.c_str(), cfg.user.c_str(),
93116
cfg.password.c_str(), cfg.database.c_str(),

include/sql_engine/distributed_txn.h

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "sql_engine/transaction_manager.h"
55
#include "sql_engine/remote_executor.h"
66
#include "sql_engine/shard_map.h"
7+
#include "sql_engine/durable_txn_log.h"
78
#include "sql_parser/common.h"
89

910
#include <string>
@@ -37,6 +38,44 @@ class DistributedTransactionManager : public TransactionManager {
3738
BackendDialect dialect = BackendDialect::MYSQL)
3839
: executor_(executor), dialect_(dialect) {}
3940

41+
// Attach a durable write-ahead log for 2PC recovery. Optional but
42+
// strongly recommended for any real workload: without it, a crash
43+
// between phase 1 and phase 2 leaves prepared transactions on every
44+
// backend with no automatic recovery path.
45+
//
46+
// The log pointer must outlive this manager. Pass nullptr to disable
47+
// logging (the default -- matches pre-existing behavior).
48+
void set_durable_log(DurableTransactionLog* log) { txn_log_ = log; }
49+
50+
// Require the WAL to succeed for commits to proceed. Default: false.
51+
// When true, if log_decision() fails, we refuse to start phase 2 and
52+
// roll back instead -- trading availability for durability. When
53+
// false, a log write failure is logged to stderr but the commit
54+
// continues (caller might prefer availability over durability).
55+
void set_require_durable_log(bool required) { require_durable_log_ = required; }
56+
57+
// Set a tight per-phase statement timeout (in milliseconds). When > 0,
58+
// the manager issues a SET SESSION max_execution_time (MySQL) or
59+
// SET LOCAL statement_timeout (PostgreSQL) on each participant BEFORE
60+
// phase 1 (XA PREPARE / PREPARE TRANSACTION) and again before phase 2
61+
// (XA COMMIT / COMMIT PREPARED). This is independent of the backend
62+
// connection's default read/write timeout -- use it to bound 2PC
63+
// specifically without affecting other queries.
64+
//
65+
// 0 (default) means "don't override", fall back to whatever the
66+
// backend connection's read/write timeout provides.
67+
//
68+
// NOTE: because ThreadSafeMultiRemoteExecutor may hand us a different
69+
// pooled connection for each execute_dml call, the SET must be issued
70+
// immediately before the statement whose timeout it bounds. We do
71+
// that by concatenating them with "; " when multi-statement is
72+
// supported, OR by issuing two separate execute_dml calls and
73+
// tolerating that the second one may not actually have the timeout
74+
// in effect (best-effort). For the MVP we use the two-call approach.
75+
void set_phase_statement_timeout_ms(uint32_t ms) {
76+
phase_statement_timeout_ms_ = ms;
77+
}
78+
4079
bool begin() override {
4180
txn_id_ = generate_txn_id();
4281
participants_.clear();
@@ -77,20 +116,69 @@ class DistributedTransactionManager : public TransactionManager {
77116

78117
// Phase 1: prepare all participants
79118
if (!phase1_prepare()) {
119+
// Durably record the ROLLBACK decision BEFORE dispatching to
120+
// participants. If we crash between here and the rollback
121+
// completing, recovery replays the rollback.
122+
if (!log_decision_or_fail(DurableTransactionLog::Decision::ROLLBACK)) {
123+
// Caller asked us to require durable logging and it failed
124+
// -- leave transactions prepared rather than lose the
125+
// decision. A DBA will resolve them manually via
126+
// XA RECOVER + XA ROLLBACK.
127+
active_ = false;
128+
return false;
129+
}
80130
phase2_rollback();
131+
// Rollback is best-effort; mark COMPLETE regardless. An in-doubt
132+
// prepared transaction left after a partial rollback is
133+
// recorded separately (if we wanted) -- currently we accept
134+
// that rollback failure is a separate class of operator issue.
135+
maybe_log_complete();
136+
active_ = false;
137+
return false;
138+
}
139+
140+
// Durably record the COMMIT decision BEFORE phase 2 dispatches.
141+
// This is the core durability invariant: a record of "commit this
142+
// transaction" exists on disk before any participant has been
143+
// told to commit, so a crash before, during, or after phase 2
144+
// is recoverable by replaying the committed decision.
145+
if (!log_decision_or_fail(DurableTransactionLog::Decision::COMMIT)) {
146+
// The WAL is required and it failed. Roll back in-memory state
147+
// so the caller sees the commit fail; prepared transactions
148+
// remain on backends until DBA cleanup.
81149
active_ = false;
82150
return false;
83151
}
84152

85153
// Phase 2: commit all participants
86154
bool ok = phase2_commit();
155+
// Only mark COMPLETE if every participant committed successfully.
156+
// A partial commit is a heuristic hazard: some participants hold
157+
// the data committed, others may still be prepared or failed. The
158+
// transaction remains in-doubt in the log so startup recovery (or
159+
// a DBA) can finish the job.
160+
if (ok) {
161+
maybe_log_complete();
162+
} else {
163+
std::fprintf(stderr,
164+
"[DistributedTransactionManager] phase 2 commit failed for "
165+
"txn %s; leaving in-doubt in the WAL for recovery.\n",
166+
txn_id_.c_str());
167+
}
87168
active_ = false;
88169
return ok;
89170
}
90171

91172
bool rollback() override {
92173
if (!active_) return false;
174+
// Durably record the ROLLBACK decision before dispatching.
175+
(void)log_decision_or_fail(DurableTransactionLog::Decision::ROLLBACK);
93176
phase2_rollback();
177+
// Rollback is best-effort; we mark COMPLETE whether or not every
178+
// backend acknowledged the rollback. A failed rollback on a
179+
// prepared transaction leaves the participant in a bad state that
180+
// a DBA needs to resolve via XA RECOVER.
181+
maybe_log_complete();
94182
active_ = false;
95183
return true;
96184
}
@@ -118,6 +206,62 @@ class DistributedTransactionManager : public TransactionManager {
118206
bool active_ = false;
119207
bool auto_commit_ = true;
120208

209+
DurableTransactionLog* txn_log_ = nullptr;
210+
bool require_durable_log_ = false;
211+
uint32_t phase_statement_timeout_ms_ = 0;
212+
213+
// Best-effort: set a per-session statement timeout on a backend before
214+
// issuing a phase-1 or phase-2 SQL. Returns true if the SET succeeded
215+
// OR if no timeout is configured; false only if the SET itself fails
216+
// and the caller asked for a real timeout (in which case the caller
217+
// may want to abort rather than risk an unbounded hang).
218+
bool maybe_set_statement_timeout(const char* backend) {
219+
if (phase_statement_timeout_ms_ == 0) return true;
220+
std::string sql;
221+
if (dialect_ == BackendDialect::MYSQL) {
222+
// MySQL 5.7.4+: max_execution_time is in milliseconds and
223+
// only bounds SELECTs. For DML and XA commands, the client
224+
// read_timeout is our real protection. We still set this for
225+
// SELECTs that might be issued between phases.
226+
sql = "SET SESSION max_execution_time = " +
227+
std::to_string(phase_statement_timeout_ms_);
228+
} else {
229+
sql = "SET LOCAL statement_timeout = " +
230+
std::to_string(phase_statement_timeout_ms_);
231+
}
232+
return send_sql(backend, sql);
233+
}
234+
235+
// Write the phase-2 decision to the durable log before dispatching.
236+
// Returns true if the commit/rollback can proceed:
237+
// - log not configured: true (log-less mode preserves legacy behavior)
238+
// - log configured and write succeeded: true
239+
// - log configured and write failed:
240+
// - require_durable_log_: false (abort, don't risk a crash
241+
// window without a recoverable decision)
242+
// - !require_durable_log_: true (write failure logged to stderr,
243+
// commit proceeds at the caller's risk)
244+
bool log_decision_or_fail(DurableTransactionLog::Decision d) {
245+
if (!txn_log_) return true;
246+
if (txn_log_->log_decision(txn_id_, d, participants_)) return true;
247+
if (require_durable_log_) {
248+
std::fprintf(stderr,
249+
"[DistributedTransactionManager] WAL write failed for txn %s; "
250+
"refusing to proceed with phase 2 because require_durable_log is set.\n",
251+
txn_id_.c_str());
252+
return false;
253+
}
254+
std::fprintf(stderr,
255+
"[DistributedTransactionManager] WAL write failed for txn %s; "
256+
"proceeding without durability (set require_durable_log to refuse instead).\n",
257+
txn_id_.c_str());
258+
return true;
259+
}
260+
261+
void maybe_log_complete() {
262+
if (txn_log_) txn_log_->log_complete(txn_id_);
263+
}
264+
121265
// Generate a unique transaction ID.
122266
static std::string generate_txn_id() {
123267
auto now = std::chrono::steady_clock::now();
@@ -146,6 +290,13 @@ class DistributedTransactionManager : public TransactionManager {
146290
bool phase1_prepare() {
147291
bool all_ok = true;
148292
for (auto& p : participants_) {
293+
// Best-effort per-phase timeout. See note on
294+
// set_phase_statement_timeout_ms: on ThreadSafeMultiRemoteExecutor
295+
// the SET may end up on a different pooled connection than
296+
// the next statement, so this is advisory. The connection-level
297+
// read/write timeout in connection_pool.h is the real ceiling.
298+
maybe_set_statement_timeout(p.c_str());
299+
149300
bool ok = false;
150301
if (dialect_ == BackendDialect::MYSQL) {
151302
std::string end_sql = "XA END '" + txn_id_ + "'";
@@ -168,6 +319,8 @@ class DistributedTransactionManager : public TransactionManager {
168319
bool phase2_commit() {
169320
bool all_ok = true;
170321
for (auto& p : participants_) {
322+
maybe_set_statement_timeout(p.c_str());
323+
171324
bool ok = false;
172325
if (dialect_ == BackendDialect::MYSQL) {
173326
std::string sql = "XA COMMIT '" + txn_id_ + "'";

0 commit comments

Comments
 (0)