@@ -80,6 +80,13 @@ class PlanExecutor {
8080 remote_executor_ = exec;
8181 }
8282
83+ // Enable parallel opening of RemoteScan children in merge/set operators.
84+ // Only safe when the RemoteExecutor is thread-safe (e.g. ThreadSafeMultiRemoteExecutor
85+ // with connection pooling). Disabled by default for backward compatibility.
86+ void set_parallel_open (bool enabled) {
87+ parallel_open_enabled_ = enabled;
88+ }
89+
8390 // Access the subquery executor (for operators that need it)
8491 SubqueryExecutor<D>* subquery_executor () { return &subquery_exec_; }
8592
@@ -294,6 +301,7 @@ class PlanExecutor {
294301 std::unordered_map<std::string, MutableDataSource*> mutable_sources_;
295302 std::vector<std::unique_ptr<Operator>> operators_;
296303 RemoteExecutor* remote_executor_ = nullptr ;
304+ bool parallel_open_enabled_ = false ;
297305 DistributeFn distribute_fn_;
298306 SubqueryExecutor<D> subquery_exec_;
299307 sql_parser::Arena subquery_plan_arena_{65536 , 1048576 };
@@ -1011,8 +1019,12 @@ class PlanExecutor {
10111019 Operator* right = build_operator (node->right );
10121020 if (!left || !right) return nullptr ;
10131021
1022+ // Enable parallel open when both children are remote scans and executor is thread-safe
1023+ bool parallel = parallel_open_enabled_ &&
1024+ (node->left && node->left ->type == PlanNodeType::REMOTE_SCAN &&
1025+ node->right && node->right ->type == PlanNodeType::REMOTE_SCAN);
10141026 auto op = std::make_unique<SetOpOperator>(
1015- left, right, node->set_op .op , node->set_op .all );
1027+ left, right, node->set_op .op , node->set_op .all , parallel );
10161028 Operator* ptr = op.get ();
10171029 operators_.push_back (std::move (op));
10181030 return ptr;
@@ -1037,12 +1049,16 @@ class PlanExecutor {
10371049 }
10381050 if (children.empty ()) return nullptr ;
10391051
1052+ // Enable parallel open when children are RemoteScans and executor is thread-safe
1053+ bool parallel = parallel_open_enabled_ &&
1054+ (children.size () > 1 ) && has_remote_scan_children (node);
10401055 auto op = std::make_unique<MergeAggregateOperator>(
10411056 std::move (children),
10421057 node->merge_aggregate .group_key_count ,
10431058 node->merge_aggregate .merge_ops ,
10441059 node->merge_aggregate .merge_op_count ,
1045- arena_);
1060+ arena_,
1061+ parallel);
10461062 Operator* ptr = op.get ();
10471063 operators_.push_back (std::move (op));
10481064 return ptr;
@@ -1074,16 +1090,40 @@ class PlanExecutor {
10741090 sort_dirs.push_back (node->merge_sort .directions [i]);
10751091 }
10761092
1093+ // Enable parallel open when children are RemoteScans and executor is thread-safe
1094+ bool parallel = parallel_open_enabled_ &&
1095+ (children.size () > 1 ) && has_remote_scan_children_merge_sort (node);
10771096 auto op = std::make_unique<MergeSortOperator>(
10781097 std::move (children),
10791098 sort_col_indices.data (),
10801099 sort_dirs.data (),
1081- node->merge_sort .key_count );
1100+ node->merge_sort .key_count ,
1101+ parallel);
10821102 Operator* ptr = op.get ();
10831103 operators_.push_back (std::move (op));
10841104 return ptr;
10851105 }
10861106
1107+ // Check whether all children of a MERGE_AGGREGATE node are REMOTE_SCAN.
1108+ static bool has_remote_scan_children (const PlanNode* node) {
1109+ if (!node || node->type != PlanNodeType::MERGE_AGGREGATE) return false ;
1110+ for (uint16_t i = 0 ; i < node->merge_aggregate .child_count ; ++i) {
1111+ if (node->merge_aggregate .children [i]->type != PlanNodeType::REMOTE_SCAN)
1112+ return false ;
1113+ }
1114+ return node->merge_aggregate .child_count > 0 ;
1115+ }
1116+
1117+ // Check whether all children of a MERGE_SORT node are REMOTE_SCAN.
1118+ static bool has_remote_scan_children_merge_sort (const PlanNode* node) {
1119+ if (!node || node->type != PlanNodeType::MERGE_SORT) return false ;
1120+ for (uint16_t i = 0 ; i < node->merge_sort .child_count ; ++i) {
1121+ if (node->merge_sort .children [i]->type != PlanNodeType::REMOTE_SCAN)
1122+ return false ;
1123+ }
1124+ return node->merge_sort .child_count > 0 ;
1125+ }
1126+
10871127 uint16_t resolve_column_index (const sql_parser::AstNode* key, const TableInfo* table) {
10881128 if (!key || !table) return 0 ;
10891129 sql_parser::StringRef col_name;
0 commit comments