Skip to content

Commit 31e4809

Browse files
committed
feat: session auto-enlistment for 2PC DML routing
Wire Session::execute_statement() to route DML through TransactionManager::route_dml() when a distributed transaction is active and sharding is configured. Previously, DML in a session with a distributed transaction bypassed the 2PC pinned session and went through a fresh pool connection. - Add is_distributed() and route_dml() virtual methods to TransactionManager with safe defaults - Override both in DistributedTransactionManager to delegate to execute_participant_dml() - Rewrite Session::execute_statement() DML path to use DistributedPlanner::distribute_dml() when sharding is configured, routing through route_dml() for distributed txns or execute_dml() otherwise - Add for_each_remote_scan() helper for scatter DML across SET_OP trees - Add 8 tests covering defaults, overrides, and session-level routing
1 parent 43f4c39 commit 31e4809

4 files changed

Lines changed: 403 additions & 10 deletions

File tree

include/sql_engine/distributed_txn.h

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -156,13 +156,10 @@ class DistributedTransactionManager : public TransactionManager {
156156
// backend isn't enlisted and can't be enlisted, the result is an
157157
// error.
158158
//
159-
// External DML routing (Session::execute_statement) currently does
160-
// NOT call this method -- it calls executor_->execute_dml directly,
161-
// which means DML inside a distributed transaction currently goes
162-
// through a fresh pool connection and is NOT part of the 2PC. The
163-
// integration hook is a follow-up. For now, unit tests that want
164-
// to simulate in-transaction DML should call this method
165-
// explicitly.
159+
// External DML routing: Session::execute_statement() now routes DML
160+
// through this method when a distributed transaction is active and
161+
// sharding is configured. This ensures DML inside a distributed
162+
// transaction goes through the pinned session and is part of the 2PC.
166163
DmlResult execute_participant_dml(const char* backend_name,
167164
sql_parser::StringRef sql) {
168165
DmlResult r;
@@ -248,6 +245,13 @@ class DistributedTransactionManager : public TransactionManager {
248245
bool is_auto_commit() const override { return auto_commit_; }
249246
void set_auto_commit(bool ac) override { auto_commit_ = ac; }
250247

248+
bool is_distributed() const override { return true; }
249+
250+
DmlResult route_dml(const char* backend_name,
251+
sql_parser::StringRef sql) override {
252+
return execute_participant_dml(backend_name, sql);
253+
}
254+
251255
const std::string& txn_id() const { return txn_id_; }
252256
const std::vector<std::string>& participants() const { return participants_; }
253257

include/sql_engine/session.h

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
#include <cstring>
2020
#include <string>
21+
#include <functional>
2122
#include <unordered_map>
2223
#include <list>
2324
#include <memory>
@@ -201,9 +202,56 @@ class Session {
201202
bool implicit_txn = txn_mgr_.is_auto_commit() && !txn_mgr_.in_transaction();
202203
if (implicit_txn) txn_mgr_.begin();
203204

204-
PlanExecutor<D> executor(functions_, catalog_, parser_.arena());
205-
wire_executor(executor);
206-
DmlResult result = executor.execute_dml(plan);
205+
DmlResult result;
206+
207+
// If sharding is configured, distribute DML to remote backends.
208+
if (shard_map_ && remote_executor_) {
209+
DistributedPlanner<D> dp(*shard_map_, catalog_, parser_.arena(),
210+
remote_executor_, &functions_);
211+
PlanNode* dist_plan = dp.distribute_dml(plan);
212+
213+
if (dist_plan && dist_plan->type == PlanNodeType::REMOTE_SCAN) {
214+
// Single-shard DML
215+
sql_parser::StringRef sql_ref{dist_plan->remote_scan.remote_sql,
216+
dist_plan->remote_scan.remote_sql_len};
217+
if (txn_mgr_.in_transaction() && txn_mgr_.is_distributed()) {
218+
result = txn_mgr_.route_dml(dist_plan->remote_scan.backend_name, sql_ref);
219+
} else {
220+
result = remote_executor_->execute_dml(
221+
dist_plan->remote_scan.backend_name, sql_ref);
222+
}
223+
} else if (dist_plan && dist_plan->type == PlanNodeType::SET_OP) {
224+
// Scatter DML to multiple shards
225+
result.success = true;
226+
result.affected_rows = 0;
227+
for_each_remote_scan(dist_plan, [&](const PlanNode* rs) {
228+
sql_parser::StringRef s{rs->remote_scan.remote_sql,
229+
rs->remote_scan.remote_sql_len};
230+
DmlResult shard_result;
231+
if (txn_mgr_.in_transaction() && txn_mgr_.is_distributed()) {
232+
shard_result = txn_mgr_.route_dml(rs->remote_scan.backend_name, s);
233+
} else {
234+
shard_result = remote_executor_->execute_dml(
235+
rs->remote_scan.backend_name, s);
236+
}
237+
if (!shard_result.success) {
238+
result.success = false;
239+
result.error_message = shard_result.error_message;
240+
}
241+
result.affected_rows += shard_result.affected_rows;
242+
});
243+
} else {
244+
// Not distributed (table not in shard map) -- local execution
245+
PlanExecutor<D> executor(functions_, catalog_, parser_.arena());
246+
wire_executor(executor);
247+
result = executor.execute_dml(plan);
248+
}
249+
} else {
250+
// No sharding: local execution
251+
PlanExecutor<D> executor(functions_, catalog_, parser_.arena());
252+
wire_executor(executor);
253+
result = executor.execute_dml(plan);
254+
}
207255

208256
if (implicit_txn) {
209257
if (result.success)
@@ -294,6 +342,19 @@ class Session {
294342
plan_cache_[plan_cache_order_.front().key] = plan_cache_order_.begin();
295343
}
296344

345+
static void for_each_remote_scan(const PlanNode* node,
346+
const std::function<void(const PlanNode*)>& fn) {
347+
if (!node) return;
348+
if (node->type == PlanNodeType::REMOTE_SCAN) {
349+
fn(node);
350+
return;
351+
}
352+
if (node->type == PlanNodeType::SET_OP) {
353+
for_each_remote_scan(node->left, fn);
354+
for_each_remote_scan(node->right, fn);
355+
}
356+
}
357+
297358
void wire_executor(PlanExecutor<D>& executor) {
298359
for (auto& kv : sources_)
299360
executor.add_data_source(kv.first.c_str(), kv.second);

include/sql_engine/transaction_manager.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
#ifndef SQL_ENGINE_TRANSACTION_MANAGER_H
22
#define SQL_ENGINE_TRANSACTION_MANAGER_H
33

4+
#include "sql_engine/dml_result.h"
5+
#include "sql_parser/common.h"
6+
47
namespace sql_engine {
58

69
// Abstract interface for transaction management.
@@ -24,6 +27,19 @@ class TransactionManager {
2427
virtual bool in_transaction() const = 0;
2528
virtual bool is_auto_commit() const = 0;
2629
virtual void set_auto_commit(bool ac) = 0;
30+
31+
// Distributed transaction support. Override in DistributedTransactionManager.
32+
virtual bool is_distributed() const { return false; }
33+
34+
// Route DML through the transaction's pinned session for a given backend.
35+
// Default: returns DmlResult with success=false.
36+
// DistributedTransactionManager overrides to route through execute_participant_dml.
37+
virtual DmlResult route_dml(const char* /*backend_name*/,
38+
sql_parser::StringRef /*sql*/) {
39+
DmlResult r;
40+
r.error_message = "route_dml not supported by this transaction manager";
41+
return r;
42+
}
2743
};
2844

2945
} // namespace sql_engine

0 commit comments

Comments
 (0)