Skip to content

Commit de8fcbb

Browse files
committed
feat: add real sharding demo with 2 MySQL backends
- Session now integrates DistributedPlanner with ShardMap support - CLI tool auto-discovers table schemas from backends via SHOW COLUMNS - InMemoryCatalog gains vector<ColumnDef> overload for dynamic schema - Demo scripts for 2-shard MySQL setup with 10 users + 10 orders - Working: scan all shards, filter pushdown, expression evaluation - Known issues: aggregate column names, sort across shards, join column resolution
1 parent dde58fb commit de8fcbb

6 files changed

Lines changed: 314 additions & 0 deletions

File tree

include/sql_engine/in_memory_catalog.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ class InMemoryCatalog : public Catalog {
1414
// Add a table with columns.
1515
void add_table(const char* schema, const char* table,
1616
std::initializer_list<ColumnDef> columns);
17+
void add_table(const char* schema, const char* table,
18+
const std::vector<ColumnDef>& columns);
1719

1820
// Remove a table.
1921
void drop_table(const char* schema, const char* table);

include/sql_engine/session.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
#include "sql_engine/plan_builder.h"
77
#include "sql_engine/dml_plan_builder.h"
88
#include "sql_engine/optimizer.h"
9+
#include "sql_engine/distributed_planner.h"
10+
#include "sql_engine/shard_map.h"
911
#include "sql_engine/catalog.h"
1012
#include "sql_engine/function_registry.h"
1113
#include "sql_engine/result_set.h"
@@ -52,6 +54,10 @@ class Session {
5254
remote_executor_ = exec;
5355
}
5456

57+
void set_shard_map(const ShardMap* sm) {
58+
shard_map_ = sm;
59+
}
60+
5561
// Execute a SELECT query. Returns a ResultSet.
5662
ResultSet execute_query(const char* sql, size_t len) {
5763
parser_.reset();
@@ -66,6 +72,12 @@ class Session {
6672

6773
plan = optimizer_.optimize(plan, parser_.arena());
6874

75+
// Distribute across shards if shard map is configured
76+
if (shard_map_ && remote_executor_) {
77+
DistributedPlanner<D> dplanner(*shard_map_, catalog_, parser_.arena(), remote_executor_, &functions_);
78+
plan = dplanner.distribute(plan);
79+
}
80+
6981
PlanExecutor<D> executor(functions_, catalog_, parser_.arena());
7082
wire_executor(executor);
7183
return executor.execute(plan);
@@ -180,6 +192,7 @@ class Session {
180192
FunctionRegistry<D> functions_;
181193
Optimizer<D> optimizer_;
182194
RemoteExecutor* remote_executor_ = nullptr;
195+
const ShardMap* shard_map_ = nullptr;
183196
std::unordered_map<std::string, DataSource*> sources_;
184197
std::unordered_map<std::string, MutableDataSource*> mutable_sources_;
185198

scripts/run_sharding_demo.sh

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
#!/bin/bash
2+
# Run distributed query demo against 2 MySQL shards
3+
set -e
4+
5+
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
6+
PROJECT_DIR="$(dirname "$SCRIPT_DIR")"
7+
cd "$PROJECT_DIR"
8+
9+
# Check if shards are running
10+
if ! docker exec parsersql-shard1 mysql -uroot -ptest -e "SELECT 1" &>/dev/null 2>&1; then
11+
echo "ERROR: Shards not running. Start them with: ./scripts/start_sharding_demo.sh"
12+
exit 1
13+
fi
14+
15+
# Build if needed
16+
if [ ! -f ./sqlengine ]; then
17+
echo "Building sqlengine..."
18+
make sqlengine 2>/dev/null
19+
fi
20+
21+
echo "=============================================="
22+
echo " Distributed SQL Engine - Sharding Demo"
23+
echo "=============================================="
24+
echo ""
25+
echo "Setup: 10 users + 10 orders split across 2 MySQL shards"
26+
echo " Shard 1 (port 13306): users 1-5 + their orders"
27+
echo " Shard 2 (port 13307): users 6-10 + their orders"
28+
echo ""
29+
echo "The engine parses SQL, determines which shards to query,"
30+
echo "fetches data from both, and merges results locally."
31+
echo ""
32+
33+
run_query() {
34+
local desc="$1"
35+
local sql="$2"
36+
echo "----------------------------------------------"
37+
echo "QUERY: $desc"
38+
echo "SQL: $sql"
39+
echo ""
40+
echo "$sql" | ./sqlengine \
41+
--backend "mysql://root:test@127.0.0.1:13306/testdb?name=shard1" \
42+
--backend "mysql://root:test@127.0.0.1:13307/testdb?name=shard2" \
43+
--shard "users:id:shard1,shard2" \
44+
--shard "orders:id:shard1,shard2" \
45+
2>&1
46+
echo ""
47+
}
48+
49+
echo "=============================================="
50+
echo " 1. Scan all rows from both shards"
51+
echo "=============================================="
52+
run_query "All users across both shards" \
53+
"SELECT * FROM users"
54+
55+
echo "=============================================="
56+
echo " 2. Filter pushdown to shards"
57+
echo "=============================================="
58+
run_query "Engineers only (filter pushed to both shards)" \
59+
"SELECT name, age, salary FROM users WHERE dept = 'Engineering'"
60+
61+
echo "=============================================="
62+
echo " 3. Distributed aggregation"
63+
echo "=============================================="
64+
run_query "Count + average salary by department (merged from 2 shards)" \
65+
"SELECT dept, COUNT(*) FROM users GROUP BY dept"
66+
67+
echo "=============================================="
68+
echo " 4. Distributed sort + limit"
69+
echo "=============================================="
70+
run_query "Top 3 highest paid (merge-sort across shards)" \
71+
"SELECT name, salary FROM users ORDER BY salary DESC LIMIT 3"
72+
73+
echo "=============================================="
74+
echo " 5. Cross-shard join"
75+
echo "=============================================="
76+
run_query "Join users and orders (both fetched from shards, joined locally)" \
77+
"SELECT u.name, o.total, o.status FROM users u JOIN orders o ON u.id = o.user_id"
78+
79+
echo "=============================================="
80+
echo " 6. Expression evaluation"
81+
echo "=============================================="
82+
run_query "Pure expression (no backend needed)" \
83+
"SELECT 1 + 2, UPPER('distributed'), COALESCE(NULL, 'sql'), 42 * 3"
84+
85+
echo "=============================================="
86+
echo " 7. Subquery"
87+
echo "=============================================="
88+
run_query "Subquery: users with above-average age" \
89+
"SELECT name, age FROM users WHERE age > (SELECT AVG(age) FROM users)"
90+
91+
echo "=============================================="
92+
echo " Demo Complete!"
93+
echo "=============================================="
94+
echo ""
95+
echo "To stop the shards: docker rm -f parsersql-shard1 parsersql-shard2"

scripts/start_sharding_demo.sh

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
#!/bin/bash
2+
# Start a 2-shard MySQL demo environment
3+
set -e
4+
5+
echo "=== Starting 2-shard MySQL demo ==="
6+
7+
# Shard 1: users with id 1-5, orders for those users
8+
docker run -d --name parsersql-shard1 \
9+
-p 13306:3306 \
10+
-e MYSQL_ROOT_PASSWORD=test \
11+
-e MYSQL_DATABASE=testdb \
12+
mysql:8.0 2>/dev/null || true
13+
14+
# Shard 2: users with id 6-10, orders for those users
15+
docker run -d --name parsersql-shard2 \
16+
-p 13307:3306 \
17+
-e MYSQL_ROOT_PASSWORD=test \
18+
-e MYSQL_DATABASE=testdb \
19+
mysql:8.0 2>/dev/null || true
20+
21+
echo "Waiting for Shard 1..."
22+
until docker exec parsersql-shard1 mysql -uroot -ptest -e "SELECT 1" &>/dev/null 2>&1; do sleep 1; done
23+
echo "Shard 1 ready"
24+
25+
echo "Waiting for Shard 2..."
26+
until docker exec parsersql-shard2 mysql -uroot -ptest -e "SELECT 1" &>/dev/null 2>&1; do sleep 1; done
27+
echo "Shard 2 ready"
28+
29+
echo "Loading data into Shard 1 (users 1-5)..."
30+
docker exec -i parsersql-shard1 mysql -uroot -ptest testdb <<'SQL'
31+
DROP TABLE IF EXISTS orders;
32+
DROP TABLE IF EXISTS users;
33+
34+
CREATE TABLE users (
35+
id INT PRIMARY KEY,
36+
name VARCHAR(255) NOT NULL,
37+
age INT,
38+
dept VARCHAR(100),
39+
salary DECIMAL(10,2)
40+
);
41+
42+
CREATE TABLE orders (
43+
id INT PRIMARY KEY,
44+
user_id INT,
45+
total DECIMAL(10,2),
46+
status VARCHAR(50),
47+
created_at DATE
48+
);
49+
50+
INSERT INTO users VALUES
51+
(1, 'Alice', 30, 'Engineering', 95000.00),
52+
(2, 'Bob', 25, 'Sales', 65000.00),
53+
(3, 'Carol', 35, 'Engineering', 110000.00),
54+
(4, 'Dave', 28, 'Marketing', 70000.00),
55+
(5, 'Eve', 32, 'Engineering', 105000.00);
56+
57+
INSERT INTO orders VALUES
58+
(101, 1, 150.00, 'completed', '2024-01-15'),
59+
(102, 2, 75.50, 'pending', '2024-02-20'),
60+
(103, 1, 200.00, 'completed', '2024-03-10'),
61+
(104, 3, 50.00, 'cancelled', '2024-01-25'),
62+
(105, 5, 300.00, 'completed', '2024-04-05');
63+
SQL
64+
65+
echo "Loading data into Shard 2 (users 6-10)..."
66+
docker exec -i parsersql-shard2 mysql -uroot -ptest testdb <<'SQL'
67+
DROP TABLE IF EXISTS orders;
68+
DROP TABLE IF EXISTS users;
69+
70+
CREATE TABLE users (
71+
id INT PRIMARY KEY,
72+
name VARCHAR(255) NOT NULL,
73+
age INT,
74+
dept VARCHAR(100),
75+
salary DECIMAL(10,2)
76+
);
77+
78+
CREATE TABLE orders (
79+
id INT PRIMARY KEY,
80+
user_id INT,
81+
total DECIMAL(10,2),
82+
status VARCHAR(50),
83+
created_at DATE
84+
);
85+
86+
INSERT INTO users VALUES
87+
(6, 'Frank', 45, 'Engineering', 130000.00),
88+
(7, 'Grace', 29, 'Sales', 68000.00),
89+
(8, 'Heidi', 38, 'Marketing', 85000.00),
90+
(9, 'Ivan', 27, 'Engineering', 90000.00),
91+
(10, 'Judy', 33, 'Sales', 72000.00);
92+
93+
INSERT INTO orders VALUES
94+
(106, 6, 500.00, 'completed', '2024-02-01'),
95+
(107, 7, 125.00, 'pending', '2024-03-15'),
96+
(108, 8, 250.00, 'completed', '2024-01-30'),
97+
(109, 9, 80.00, 'completed', '2024-04-10'),
98+
(110, 10, 175.00, 'cancelled', '2024-02-28');
99+
SQL
100+
101+
echo ""
102+
echo "=== Sharding Demo Ready ==="
103+
echo ""
104+
echo "Shard 1 (port 13306): users 1-5 (Alice, Bob, Carol, Dave, Eve)"
105+
echo "Shard 2 (port 13307): users 6-10 (Frank, Grace, Heidi, Ivan, Judy)"
106+
echo ""
107+
echo "Each shard has 5 users and 5 orders."
108+
echo "Total: 10 users, 10 orders across 2 shards."
109+
echo ""
110+
echo "To run the demo:"
111+
echo ' ./scripts/run_sharding_demo.sh'
112+
echo ""
113+
echo "To stop:"
114+
echo ' docker rm -f parsersql-shard1 parsersql-shard2'

src/sql_engine/in_memory_catalog.cpp

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,40 @@ void InMemoryCatalog::add_table(const char* schema, const char* table,
8181
}
8282
}
8383

84+
void InMemoryCatalog::add_table(const char* schema, const char* table,
85+
const std::vector<ColumnDef>& columns) {
86+
// Delegate to the initializer_list version by reconstructing the same logic
87+
std::string key = make_key(schema, table);
88+
TableData& td = tables_[key];
89+
td.schema_str = schema ? schema : "";
90+
td.table_str = table;
91+
td.column_names.clear();
92+
td.columns.clear();
93+
uint16_t ordinal = 0;
94+
for (const auto& def : columns) {
95+
td.column_names.emplace_back(def.name);
96+
ColumnInfo ci{};
97+
ci.type = def.type;
98+
ci.ordinal = ordinal++;
99+
ci.nullable = def.nullable;
100+
td.columns.push_back(ci);
101+
}
102+
for (size_t i = 0; i < td.columns.size(); ++i) {
103+
td.columns[i].name = sql_parser::StringRef{
104+
td.column_names[i].c_str(),
105+
static_cast<uint32_t>(td.column_names[i].size())
106+
};
107+
}
108+
td.info.schema_name = sql_parser::StringRef{
109+
td.schema_str.c_str(), static_cast<uint32_t>(td.schema_str.size())
110+
};
111+
td.info.table_name = sql_parser::StringRef{
112+
td.table_str.c_str(), static_cast<uint32_t>(td.table_str.size())
113+
};
114+
td.info.columns = td.columns.data();
115+
td.info.column_count = static_cast<uint16_t>(td.columns.size());
116+
}
117+
84118
void InMemoryCatalog::drop_table(const char* schema, const char* table) {
85119
std::string key = make_key(schema, table);
86120
tables_.erase(key);

tools/sqlengine.cpp

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,11 +423,67 @@ int main(int argc, char* argv[]) {
423423
}
424424
}
425425

426+
// Auto-discover table schemas from the first backend for each sharded table
427+
if (remote_exec && !shards.empty()) {
428+
for (auto& sc : shards) {
429+
const char* first_backend = sc.shards.empty() ? nullptr : sc.shards[0].backend_name.c_str();
430+
if (!first_backend) continue;
431+
432+
// Query SHOW COLUMNS FROM table
433+
std::string show_sql = "SHOW COLUMNS FROM " + sc.table_name;
434+
sql_parser::StringRef sql_ref{show_sql.c_str(), static_cast<uint32_t>(show_sql.size())};
435+
try {
436+
ResultSet cols = remote_exec->execute(first_backend, sql_ref);
437+
std::vector<ColumnDef> col_defs;
438+
for (size_t i = 0; i < cols.row_count(); ++i) {
439+
const Row& r = cols.rows[i];
440+
std::string col_name;
441+
if (r.column_count > 0 && !r.get(0).is_null()) {
442+
col_name.assign(r.get(0).str_val.ptr, r.get(0).str_val.len);
443+
}
444+
std::string col_type_str;
445+
if (r.column_count > 1 && !r.get(1).is_null()) {
446+
col_type_str.assign(r.get(1).str_val.ptr, r.get(1).str_val.len);
447+
}
448+
// Map MySQL type string to SqlType
449+
SqlType st;
450+
if (col_type_str.find("int") != std::string::npos ||
451+
col_type_str.find("INT") != std::string::npos) {
452+
st = SqlType::make_int();
453+
} else if (col_type_str.find("decimal") != std::string::npos ||
454+
col_type_str.find("DECIMAL") != std::string::npos) {
455+
st = SqlType::make_decimal(10, 2);
456+
} else if (col_type_str.find("date") != std::string::npos ||
457+
col_type_str.find("DATE") != std::string::npos) {
458+
st = SqlType{SqlType::DATE};
459+
} else {
460+
st = SqlType::make_varchar(255);
461+
}
462+
bool nullable = true;
463+
if (r.column_count > 2 && !r.get(2).is_null()) {
464+
std::string null_str(r.get(2).str_val.ptr, r.get(2).str_val.len);
465+
nullable = (null_str == "YES");
466+
}
467+
// Store column name persistently
468+
col_defs.push_back(ColumnDef{strdup(col_name.c_str()), st, nullable});
469+
}
470+
if (!col_defs.empty()) {
471+
catalog.add_table("", sc.table_name.c_str(), col_defs);
472+
}
473+
} catch (...) {
474+
// Schema discovery failed — continue without catalog entry
475+
}
476+
}
477+
}
478+
426479
// Create session
427480
Session<Dialect::MySQL> session(catalog, txn_mgr);
428481
if (remote_exec) {
429482
session.set_remote_executor(remote_exec);
430483
}
484+
if (!shards.empty()) {
485+
session.set_shard_map(&shard_map);
486+
}
431487

432488
// Detect interactive mode
433489
bool interactive = isatty(fileno(stdin));

0 commit comments

Comments
 (0)