|
| 1 | +# Distributed Query Planner — Design Specification |
| 2 | + |
| 3 | +## Overview |
| 4 | + |
| 5 | +The distributed planner decomposes logical plans across multiple backends. Given a shard map (which tables live where), it rewrites the plan to push operations to remote backends and merge results locally. This is the core of a Vitess-like distributed query engine. |
| 6 | + |
| 7 | +Sub-project 8 of the query engine. Depends on: logical plan, optimizer, executor, emitter. |
| 8 | + |
| 9 | +### Goals |
| 10 | + |
| 11 | +- **Shard map** — configurable table-to-backend mapping, supports both unsharded and multi-shard tables |
| 12 | +- **Full query decomposition** — handles cross-backend joins, distributed aggregation (with partial aggregates), distributed sort+limit (merge-sort), distributed distinct |
| 13 | +- **Remote SQL generation** — produces SQL strings to send to backends using the existing emitter |
| 14 | +- **RemoteExecutor interface** — abstract backend communication, mock in tests |
| 15 | +- **3 new operators** — RemoteScan, MergeAggregate, MergeSort |
| 16 | +- **Correctness-preserving** — distributed execution produces identical results to local execution |
| 17 | + |
| 18 | +### Constraints |
| 19 | + |
| 20 | +- C++17, arena-allocated |
| 21 | +- No actual networking — uses RemoteExecutor interface (implementations deferred) |
| 22 | +- No shard-key-aware routing (all shards queried for sharded tables) |
| 23 | +- No distributed transactions |
| 24 | + |
| 25 | +### Non-Goals |
| 26 | + |
| 27 | +- Wire protocol server (separate sub-project) |
| 28 | +- Network transport / connection pooling |
| 29 | +- Shard key routing (skip irrelevant shards based on WHERE predicates) |
| 30 | +- Distributed writes (INSERT/UPDATE/DELETE across backends) |
| 31 | + |
| 32 | +--- |
| 33 | + |
| 34 | +## Shard Map |
| 35 | + |
| 36 | +```cpp |
| 37 | +struct ShardInfo { |
| 38 | + std::string backend_name; |
| 39 | +}; |
| 40 | + |
| 41 | +struct TableShardConfig { |
| 42 | + std::string table_name; |
| 43 | + std::string shard_key; // empty if unsharded |
| 44 | + std::vector<ShardInfo> shards; // 1 if unsharded, N if sharded |
| 45 | +}; |
| 46 | + |
| 47 | +class ShardMap { |
| 48 | +public: |
| 49 | + void add_table(const TableShardConfig& config); |
| 50 | + bool is_sharded(StringRef table_name) const; |
| 51 | + const std::vector<ShardInfo>& get_shards(StringRef table_name) const; |
| 52 | + StringRef get_shard_key(StringRef table_name) const; |
| 53 | +}; |
| 54 | +``` |
| 55 | +
|
| 56 | +--- |
| 57 | +
|
| 58 | +## New Plan Node Types |
| 59 | +
|
| 60 | +```cpp |
| 61 | +enum class PlanNodeType : uint8_t { |
| 62 | + // ... existing 9 types ... |
| 63 | + REMOTE_SCAN, // fetch from remote backend via SQL |
| 64 | + MERGE_AGGREGATE, // merge partial aggregates from N sources |
| 65 | + MERGE_SORT, // merge N pre-sorted streams |
| 66 | +}; |
| 67 | +``` |
| 68 | + |
| 69 | +### REMOTE_SCAN |
| 70 | + |
| 71 | +```cpp |
| 72 | +struct { |
| 73 | + const char* backend_name; |
| 74 | + const char* remote_sql; |
| 75 | + uint16_t remote_sql_len; |
| 76 | + const TableInfo* table; // expected result schema |
| 77 | +} remote_scan; |
| 78 | +``` |
| 79 | + |
| 80 | +### MERGE_AGGREGATE |
| 81 | + |
| 82 | +Sits above N children (RemoteScans), each returning partial aggregates. Merges them by group key. |
| 83 | + |
| 84 | +Children are connected via a plan node array (left = first child, additional children linked via next pointer or stored in a side array). |
| 85 | + |
| 86 | +Merge operations per aggregate: |
| 87 | + |
| 88 | +| Original | Remote sends | Local merge | |
| 89 | +|---|---|---| |
| 90 | +| COUNT(*) | COUNT(*) | SUM of counts | |
| 91 | +| COUNT(col) | COUNT(col) | SUM of counts | |
| 92 | +| SUM(col) | SUM(col) | SUM of sums | |
| 93 | +| AVG(col) | SUM(col), COUNT(col) | SUM(sums) / SUM(counts) | |
| 94 | +| MIN(col) | MIN(col) | MIN of mins | |
| 95 | +| MAX(col) | MAX(col) | MAX of maxes | |
| 96 | + |
| 97 | +### MERGE_SORT |
| 98 | + |
| 99 | +Sits above N children (RemoteScans), each returning pre-sorted results. Performs N-way merge to produce globally sorted output. |
| 100 | + |
| 101 | +```cpp |
| 102 | +struct { |
| 103 | + const AstNode** keys; |
| 104 | + uint8_t* directions; |
| 105 | + uint16_t key_count; |
| 106 | +} merge_sort; |
| 107 | +``` |
| 108 | + |
| 109 | +--- |
| 110 | + |
| 111 | +## Distributed Planner |
| 112 | + |
| 113 | +```cpp |
| 114 | +template <Dialect D> |
| 115 | +class DistributedPlanner { |
| 116 | +public: |
| 117 | + DistributedPlanner(const ShardMap& shards, const Catalog& catalog, Arena& arena); |
| 118 | + |
| 119 | + // Rewrite a logical plan for distributed execution |
| 120 | + PlanNode* distribute(PlanNode* plan); |
| 121 | + |
| 122 | +private: |
| 123 | + const ShardMap& shards_; |
| 124 | + const Catalog& catalog_; |
| 125 | + Arena& arena_; |
| 126 | + |
| 127 | + PlanNode* distribute_scan(PlanNode* scan_node); |
| 128 | + PlanNode* distribute_join(PlanNode* join_node); |
| 129 | + PlanNode* distribute_aggregate(PlanNode* agg_node, PlanNode* source); |
| 130 | + PlanNode* distribute_sort_limit(PlanNode* sort_or_limit, PlanNode* source); |
| 131 | + PlanNode* distribute_distinct(PlanNode* distinct_node, PlanNode* source); |
| 132 | +}; |
| 133 | +``` |
| 134 | +
|
| 135 | +### Decomposition Cases |
| 136 | +
|
| 137 | +**Case 1: Single table, unsharded** |
| 138 | +
|
| 139 | +The entire sub-plan that touches one unsharded table is converted to a single RemoteScan. Filters, projections, sort, and limit are all pushed into the remote SQL. |
| 140 | +
|
| 141 | +``` |
| 142 | +Filter(age > 18) → Scan(users) [users → backend_a] |
| 143 | + |
| 144 | +→ RemoteScan(backend_a, "SELECT * FROM users WHERE age > 18") |
| 145 | +``` |
| 146 | +
|
| 147 | +**Case 2: Single table, sharded (N backends)** |
| 148 | +
|
| 149 | +Each shard gets its own RemoteScan with the same query. Results are combined with a local UNION ALL (SetOp node). |
| 150 | +
|
| 151 | +``` |
| 152 | +Scan(users) [users → shard_1, shard_2, shard_3] |
| 153 | + |
| 154 | +→ SetOp(UNION ALL) |
| 155 | + ├── RemoteScan(shard_1, "SELECT * FROM users") |
| 156 | + ├── RemoteScan(shard_2, "SELECT * FROM users") |
| 157 | + └── RemoteScan(shard_3, "SELECT * FROM users") |
| 158 | +``` |
| 159 | +
|
| 160 | +WHERE conditions are pushed into each remote query. |
| 161 | +
|
| 162 | +**Case 3: Aggregation on sharded table** |
| 163 | +
|
| 164 | +Each shard computes partial aggregates. A MergeAggregate node combines them locally. |
| 165 | +
|
| 166 | +``` |
| 167 | +Aggregate(GROUP BY dept, COUNT(*), AVG(sal)) |
| 168 | + └── Scan(users, 3 shards) |
| 169 | + |
| 170 | +→ MergeAggregate(GROUP BY dept, SUM_OF_COUNTS, SUM_OVER_SUM) |
| 171 | + ├── RemoteScan(shard_1, "SELECT dept, COUNT(*), SUM(sal), COUNT(sal) FROM users GROUP BY dept") |
| 172 | + ├── RemoteScan(shard_2, same) |
| 173 | + └── RemoteScan(shard_3, same) |
| 174 | +``` |
| 175 | +
|
| 176 | +AVG is decomposed: remote sends SUM + COUNT, local computes SUM/COUNT. |
| 177 | +
|
| 178 | +**Case 4: ORDER BY + LIMIT on sharded table** |
| 179 | +
|
| 180 | +Each shard returns its top-N sorted. MergeSort produces globally sorted output. Outer Limit takes final top-N. |
| 181 | +
|
| 182 | +``` |
| 183 | +Limit(10) → Sort(name ASC) → Scan(users, 3 shards) |
| 184 | + |
| 185 | +→ Limit(10) → MergeSort(name ASC) |
| 186 | + ├── RemoteScan(shard_1, "SELECT * FROM users ORDER BY name LIMIT 10") |
| 187 | + ├── RemoteScan(shard_2, same) |
| 188 | + └── RemoteScan(shard_3, same) |
| 189 | +``` |
| 190 | +
|
| 191 | +**Case 5: Cross-backend JOIN** |
| 192 | +
|
| 193 | +Tables on different backends. Each side is fetched remotely, join is performed locally. Filters are pushed to the appropriate remote side. |
| 194 | +
|
| 195 | +``` |
| 196 | +Join(u.id = o.user_id) |
| 197 | + ├── Filter(age > 18) → Scan(users) [backend_a] |
| 198 | + └── Scan(orders) [backend_b] |
| 199 | + |
| 200 | +→ Local: NestedLoopJoin(u.id = o.user_id) |
| 201 | + ├── RemoteScan(backend_a, "SELECT * FROM users WHERE age > 18") |
| 202 | + └── RemoteScan(backend_b, "SELECT * FROM orders") |
| 203 | +``` |
| 204 | +
|
| 205 | +**Case 6: DISTINCT on sharded table** |
| 206 | +
|
| 207 | +Each shard computes local DISTINCT. Local DISTINCT deduplicates across shards. |
| 208 | +
|
| 209 | +``` |
| 210 | +Distinct → Scan(users, 3 shards) |
| 211 | + |
| 212 | +→ Distinct |
| 213 | + └── SetOp(UNION ALL) |
| 214 | + ├── RemoteScan(shard_1, "SELECT DISTINCT dept FROM users") |
| 215 | + ├── RemoteScan(shard_2, same) |
| 216 | + └── RemoteScan(shard_3, same) |
| 217 | +``` |
| 218 | +
|
| 219 | +--- |
| 220 | +
|
| 221 | +## Remote SQL Generation |
| 222 | +
|
| 223 | +```cpp |
| 224 | +template <Dialect D> |
| 225 | +class RemoteQueryBuilder { |
| 226 | +public: |
| 227 | + RemoteQueryBuilder(Arena& arena); |
| 228 | +
|
| 229 | + // Build SQL string from plan components |
| 230 | + StringRef build_select(const TableInfo* table, |
| 231 | + const AstNode* where_expr, // nullable |
| 232 | + const AstNode** project_exprs, // nullable |
| 233 | + uint16_t project_count, |
| 234 | + const AstNode** group_by, // nullable |
| 235 | + uint16_t group_count, |
| 236 | + const AstNode** order_keys, // nullable |
| 237 | + uint8_t* order_dirs, |
| 238 | + uint16_t order_count, |
| 239 | + int64_t limit, // -1 = no limit |
| 240 | + bool distinct); |
| 241 | +}; |
| 242 | +``` |
| 243 | + |
| 244 | +Uses `StringBuilder` (existing) and `Emitter<D>` (for expression AST nodes) to produce SQL strings. The generated SQL is arena-allocated. |
| 245 | + |
| 246 | +--- |
| 247 | + |
| 248 | +## Remote Executor Interface |
| 249 | + |
| 250 | +```cpp |
| 251 | +class RemoteExecutor { |
| 252 | +public: |
| 253 | + virtual ~RemoteExecutor() = default; |
| 254 | + virtual ResultSet execute(const char* backend_name, StringRef sql) = 0; |
| 255 | +}; |
| 256 | +``` |
| 257 | +
|
| 258 | +For tests: `MockRemoteExecutor` — pre-configured with per-backend data. When `execute()` is called, it parses the incoming SQL using our parser, executes against in-memory data using our executor, and returns the results. This validates that the remote SQL the planner generates is correct. |
| 259 | +
|
| 260 | +--- |
| 261 | +
|
| 262 | +## New Operators |
| 263 | +
|
| 264 | +### RemoteScanOperator |
| 265 | +
|
| 266 | +```cpp |
| 267 | +class RemoteScanOperator : public Operator { |
| 268 | + RemoteExecutor* executor_; |
| 269 | + const char* backend_name_; |
| 270 | + StringRef remote_sql_; |
| 271 | + ResultSet results_; // materialized on open() |
| 272 | + size_t cursor_ = 0; |
| 273 | +}; |
| 274 | +``` |
| 275 | + |
| 276 | +On `open()`: call `executor_->execute(backend_name_, remote_sql_)` and store the ResultSet. On `next()`: yield rows from the stored results. |
| 277 | + |
| 278 | +### MergeAggregateOperator |
| 279 | + |
| 280 | +Takes N child operators (each returning partial aggregates), merges by group key. |
| 281 | + |
| 282 | +On `open()`: consume all rows from all children, build merge map keyed by group-by values. For each group, combine partial aggregates (SUM counts, SUM sums, MIN of mins, etc.). |
| 283 | + |
| 284 | +On `next()`: yield one row per merged group. |
| 285 | + |
| 286 | +### MergeSortOperator |
| 287 | + |
| 288 | +Takes N child operators (each returning pre-sorted rows), performs N-way merge. |
| 289 | + |
| 290 | +On `open()`: open all children, peek at first row from each. |
| 291 | + |
| 292 | +On `next()`: compare head rows from all children, yield the smallest (or largest for DESC), advance that child. |
| 293 | + |
| 294 | +Uses a min-heap for efficient N-way merge: O(log N) per row. |
| 295 | + |
| 296 | +--- |
| 297 | + |
| 298 | +## File Organization |
| 299 | + |
| 300 | +``` |
| 301 | +include/sql_engine/ |
| 302 | + shard_map.h — ShardMap, ShardInfo, TableShardConfig |
| 303 | + distributed_planner.h — DistributedPlanner<D> |
| 304 | + remote_query_builder.h — SQL generation for remote sub-plans |
| 305 | + remote_executor.h — RemoteExecutor interface |
| 306 | + operators/ |
| 307 | + remote_scan_op.h — RemoteScanOperator |
| 308 | + merge_aggregate_op.h — MergeAggregateOperator |
| 309 | + merge_sort_op.h — MergeSortOperator |
| 310 | +
|
| 311 | +tests/ |
| 312 | + test_distributed_planner.cpp — All 6 decomposition cases + correctness |
| 313 | +``` |
| 314 | + |
| 315 | +--- |
| 316 | + |
| 317 | +## Testing Strategy |
| 318 | + |
| 319 | +### MockRemoteExecutor |
| 320 | + |
| 321 | +The mock uses our own engine to execute remote queries: |
| 322 | + |
| 323 | +```cpp |
| 324 | +class MockRemoteExecutor : public RemoteExecutor { |
| 325 | + // Maps backend_name → (Catalog + DataSource) |
| 326 | + // When execute() called: parse SQL, build plan, execute locally, return results |
| 327 | +}; |
| 328 | +``` |
| 329 | +
|
| 330 | +This validates both the planner's decomposition AND the generated SQL's correctness. |
| 331 | +
|
| 332 | +### Decomposition tests |
| 333 | +
|
| 334 | +For each of the 6 cases: |
| 335 | +1. Set up shard map |
| 336 | +2. Parse SQL → build logical plan → optimize → distribute |
| 337 | +3. Walk the distributed plan → verify node types (RemoteScan, MergeAggregate, etc.) |
| 338 | +4. Verify remote SQL strings contain expected clauses (WHERE, GROUP BY, ORDER BY, LIMIT) |
| 339 | +
|
| 340 | +### Correctness tests |
| 341 | +
|
| 342 | +For each case: |
| 343 | +1. Execute the query locally (all data in one place) → get reference results |
| 344 | +2. Execute the distributed plan (data split across mock backends) → get distributed results |
| 345 | +3. Compare: results must be identical (same rows, same order for ORDER BY queries) |
| 346 | +
|
| 347 | +### Test data setup |
| 348 | +
|
| 349 | +Create a "users" table with ~20 rows, split across 3 mock backends. Include variety: different ages, departments, names for meaningful filter/group/sort testing. |
| 350 | +
|
| 351 | +--- |
| 352 | +
|
| 353 | +## Performance Targets |
| 354 | +
|
| 355 | +| Operation | Target | |
| 356 | +|---|---| |
| 357 | +| Distribute simple unsharded query | <5us | |
| 358 | +| Distribute sharded query (3 shards) | <20us | |
| 359 | +| Remote SQL generation | <2us | |
| 360 | +| MergeSort (3 streams × 100 rows) | <100us | |
| 361 | +| MergeAggregate (3 streams × 10 groups) | <50us | |
0 commit comments