From 01d8b03ac19bc611c2687b2cf7bc03f2cf0bee68 Mon Sep 17 00:00:00 2001 From: Andrey Zvonov <32552679+zvonand@users.noreply.github.com> Date: Thu, 23 Apr 2026 17:09:35 +0200 Subject: [PATCH 1/5] Merge pull request #1527 from Altinity/feature/antalya-26.1/json_part2 Cluster Joins part 2 - global mode --- src/Core/Settings.cpp | 2 +- src/Storages/IStorageCluster.cpp | 84 +++++++++++++++++------ src/Storages/IStorageCluster.h | 7 +- src/Storages/buildQueryTreeForShard.cpp | 43 ++++++++++-- src/Storages/buildQueryTreeForShard.h | 6 +- tests/integration/test_s3_cluster/test.py | 26 ++++--- 6 files changed, 127 insertions(+), 41 deletions(-) diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index c9994afb205e..2e8dc20af363 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -1969,7 +1969,7 @@ ClickHouse applies this setting when the query contains the product of object st Possible values: - `local` — Replaces the database and table in the subquery with local ones for the destination server (shard), leaving the normal `IN`/`JOIN.` -- `global` — Unsupported for now. Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.` +- `global` — Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.` Right table executes first and is added to the secondary query as temporay table. - `allow` — Default value. Allows the use of these types of subqueries. )", 0) \ \ diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 7ae4d85d92f2..13d597d7b0a5 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -17,12 +17,14 @@ #include #include #include +#include #include #include #include #include #include #include +#include #include #include #include @@ -110,11 +112,14 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContext; using Base::Base; - explicit SearcherVisitor(std::unordered_set types_, ContextPtr context) : Base(context), types(types_) {} + explicit SearcherVisitor(std::unordered_set types_, size_t entry_, ContextPtr context) + : Base(context) + , types(types_) + , entry(entry_) {} bool needChildVisit(QueryTreeNodePtr & /*parent*/, QueryTreeNodePtr & /*child*/) { - return getSubqueryDepth() <= 2 && !passed_node; + return getSubqueryDepth() <= 2 && !passed_node && !current_entry; } void enterImpl(QueryTreeNodePtr & node) @@ -125,13 +130,19 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContextgetNodeType(); if (types.contains(node_type)) - passed_node = node; + { + ++current_entry; + if (current_entry == entry) + passed_node = node; + } } QueryTreeNodePtr getNode() const { return passed_node; } private: std::unordered_set types; + size_t entry; + size_t current_entry = 0; QueryTreeNodePtr passed_node; }; @@ -198,15 +209,24 @@ Converts localtable as t ON s3.key == t.key -to +to (object_storage_cluster_join_mode='local') SELECT s3.c1, s3.c2, s3.key FROM s3Cluster(...) AS s3 + +or (object_storage_cluster_join_mode='global') + + SELECT s3.c1, s3.c2, t.c3 + FROM + s3Cluster(...) as s3 + JOIN + values('key UInt32, data String', (1, 'one'), (2, 'two'), ...) as t + ON s3.key == t.key */ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( ASTPtr & query_to_send, - QueryTreeNodePtr query_tree, + SelectQueryInfo query_info, const ContextPtr & context) { auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode]; @@ -214,17 +234,17 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( { case ObjectStorageClusterJoinMode::LOCAL: { - auto info = getQueryTreeInfo(query_tree, context); + auto info = getQueryTreeInfo(query_info.query_tree, context); if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) { - auto modified_query_tree = query_tree->clone(); + auto modified_query_tree = query_info.query_tree->clone(); - SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); + SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, 1, context); left_table_expression_searcher.visit(modified_query_tree); auto table_function_node = left_table_expression_searcher.getNode(); if (!table_function_node) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find left table function node"); QueryTreeNodePtr query_tree_distributed; @@ -237,7 +257,7 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( } else if (info.has_cross_join) { - SearcherVisitor join_searcher({QueryTreeNodeType::CROSS_JOIN}, context); + SearcherVisitor join_searcher({QueryTreeNodeType::CROSS_JOIN}, 1, context); join_searcher.visit(modified_query_tree); auto cross_join_node = join_searcher.getNode(); if (!cross_join_node) @@ -292,8 +312,24 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( return; } case ObjectStorageClusterJoinMode::GLOBAL: - // TODO - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "`Global` mode for `object_storage_cluster_join_mode` setting is unimplemented for now"); + { + auto info = getQueryTreeInfo(query_info.query_tree, context); + + if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) + { + auto modified_query_tree = query_info.query_tree->clone(); + + rewriteJoinToGlobalJoin(modified_query_tree, context); + modified_query_tree = buildQueryTreeForShard( + query_info.planner_context, + modified_query_tree, + /*allow_global_join_for_right_table*/ false, + /*find_cross_join*/ true); + query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree); + } + + return; + } case ObjectStorageClusterJoinMode::ALLOW: // Do nothing special return; } @@ -327,7 +363,7 @@ void IStorageCluster::read( /// rewrite query to execute `remote('remote_host', s3(...))` /// remote_host can execute query itself or make on-cluster query depends on own `object_storage_cluster` setting updateConfigurationIfNeeded(context); - updateQueryWithJoinToSendIfNeeded(query_to_send, query_info.query_tree, context); + updateQueryWithJoinToSendIfNeeded(query_to_send, query_info, context); updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context, /*make_cluster_function*/ false); auto remote_initiator_cluster_name = settings[Setting::object_storage_remote_initiator_cluster].value; @@ -356,7 +392,7 @@ void IStorageCluster::read( SharedHeader sample_block; - updateQueryWithJoinToSendIfNeeded(query_to_send, query_info.query_tree, context); + updateQueryWithJoinToSendIfNeeded(query_to_send, query_info, context); if (settings[Setting::allow_experimental_analyzer]) { @@ -408,6 +444,10 @@ void IStorageCluster::read( auto this_ptr = std::static_pointer_cast(shared_from_this()); + std::optional external_tables = std::nullopt; + if (query_info.planner_context && query_info.planner_context->getMutableQueryContext()) + external_tables = query_info.planner_context->getMutableQueryContext()->getExternalTables(); + auto reading = std::make_unique( column_names, query_info, @@ -418,7 +458,8 @@ void IStorageCluster::read( std::move(query_to_send), processed_stage, cluster, - log); + log, + external_tables); query_plan.addStep(std::move(reading)); } @@ -558,7 +599,7 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const new_context, /*throttler=*/nullptr, scalars, - Tables(), + external_tables.has_value() ? *external_tables : Tables(), processed_stage, nullptr, RemoteQueryExecutor::Extension{.task_iterator = extension->task_iterator, .replica_info = std::move(replica_info)}, @@ -597,7 +638,7 @@ IStorageCluster::QueryTreeInfo IStorageCluster::getQueryTreeInfo(QueryTreeNodePt info.has_cross_join = true; } - SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); + SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, 1, context); left_table_expression_searcher.visit(query_tree); auto table_function_node = left_table_expression_searcher.getNode(); if (!table_function_node) @@ -632,9 +673,12 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( throw Exception(ErrorCodes::NOT_IMPLEMENTED, "object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true"); - auto info = getQueryTreeInfo(query_info.query_tree, context); - if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) - return QueryProcessingStage::Enum::FetchColumns; + if (object_storage_cluster_join_mode == ObjectStorageClusterJoinMode::LOCAL) + { + auto info = getQueryTreeInfo(query_info.query_tree, context); + if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) + return QueryProcessingStage::Enum::FetchColumns; + } } /// Initiator executes query on remote node. diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 2c0c6d3c6029..6bc8e2a8ada4 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -67,7 +67,7 @@ class IStorageCluster : public IStorage const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/, bool /*make_cluster_function*/) {} - void updateQueryWithJoinToSendIfNeeded(ASTPtr & query_to_send, QueryTreeNodePtr query_tree, const ContextPtr & context); + void updateQueryWithJoinToSendIfNeeded(ASTPtr & query_to_send, SelectQueryInfo query_info, const ContextPtr & context); virtual void updateConfigurationIfNeeded(ContextPtr /* context */) {} @@ -141,7 +141,8 @@ class ReadFromCluster : public SourceStepWithFilter ASTPtr query_to_send_, QueryProcessingStage::Enum processed_stage_, ClusterPtr cluster_, - LoggerPtr log_) + LoggerPtr log_, + std::optional external_tables_) : SourceStepWithFilter( std::move(sample_block), column_names_, @@ -153,6 +154,7 @@ class ReadFromCluster : public SourceStepWithFilter , processed_stage(processed_stage_) , cluster(std::move(cluster_)) , log(log_) + , external_tables(external_tables_) { } @@ -164,6 +166,7 @@ class ReadFromCluster : public SourceStepWithFilter LoggerPtr log; std::optional extension; + std::optional external_tables; void createExtension(const ActionsDAG::Node * predicate); ContextPtr updateSettings(const Settings & settings); diff --git a/src/Storages/buildQueryTreeForShard.cpp b/src/Storages/buildQueryTreeForShard.cpp index efb7d426b4fe..168847f03cec 100644 --- a/src/Storages/buildQueryTreeForShard.cpp +++ b/src/Storages/buildQueryTreeForShard.cpp @@ -43,6 +43,7 @@ namespace Setting extern const SettingsBool prefer_global_in_and_join; extern const SettingsBool enable_add_distinct_to_in_subqueries; extern const SettingsInt64 optimize_const_name_size; + extern const SettingsObjectStorageClusterJoinMode object_storage_cluster_join_mode; } namespace ErrorCodes @@ -121,8 +122,9 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito using Base = InDepthQueryTreeVisitorWithContext; using Base::Base; - explicit DistributedProductModeRewriteInJoinVisitor(const ContextPtr & context_) + explicit DistributedProductModeRewriteInJoinVisitor(const ContextPtr & context_, bool find_cross_join_) : Base(context_) + , find_cross_join(find_cross_join_) {} struct InFunctionOrJoin @@ -158,9 +160,11 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito { auto * function_node = node->as(); auto * join_node = node->as(); + CrossJoinNode * cross_join_node = find_cross_join ? node->as() : nullptr; if ((function_node && isNameOfGlobalInFunction(function_node->getFunctionName())) || - (join_node && join_node->getLocality() == JoinLocality::Global)) + (join_node && join_node->getLocality() == JoinLocality::Global) || + cross_join_node) { InFunctionOrJoin in_function_or_join_entry; in_function_or_join_entry.query_node = node; @@ -224,7 +228,9 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito replacement_table_expression->setTableExpressionModifiers(*table_expression_modifiers); replacement_map.emplace(table_node.get(), std::move(replacement_table_expression)); } - else if ((distributed_product_mode == DistributedProductMode::GLOBAL || getSettings()[Setting::prefer_global_in_and_join]) && + else if ((distributed_product_mode == DistributedProductMode::GLOBAL || + getSettings()[Setting::prefer_global_in_and_join] || + (find_cross_join && getSettings()[Setting::object_storage_cluster_join_mode] == ObjectStorageClusterJoinMode::GLOBAL)) && !in_function_or_join_stack.empty()) { auto * in_or_join_node_to_modify = in_function_or_join_stack.back().query_node.get(); @@ -258,6 +264,8 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito std::vector in_function_or_join_stack; std::unordered_map replacement_map; std::vector global_in_or_join_nodes; + + bool find_cross_join = false; }; /** Replaces large constant values with `__getScalar` function calls to avoid @@ -558,14 +566,18 @@ QueryTreeNodePtr getSubqueryFromTableExpression( } -QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_context, QueryTreeNodePtr query_tree_to_modify, bool allow_global_join_for_right_table) +QueryTreeNodePtr buildQueryTreeForShard( + const PlannerContextPtr & planner_context, + QueryTreeNodePtr query_tree_to_modify, + bool allow_global_join_for_right_table, + bool find_cross_join) { CollectColumnSourceToColumnsVisitor collect_column_source_to_columns_visitor; collect_column_source_to_columns_visitor.visit(query_tree_to_modify); const auto & column_source_to_columns = collect_column_source_to_columns_visitor.getColumnSourceToColumns(); - DistributedProductModeRewriteInJoinVisitor visitor(planner_context->getQueryContext()); + DistributedProductModeRewriteInJoinVisitor visitor(planner_context->getQueryContext(), find_cross_join); visitor.visit(query_tree_to_modify); auto replacement_map = visitor.getReplacementMap(); @@ -627,6 +639,24 @@ QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_contex replacement_map.emplace(join_table_expression.get(), std::move(temporary_table_expression_node)); continue; } + if (auto * cross_join_node = global_in_or_join_node.query_node->as()) + { + auto tables_count = cross_join_node->getTableExpressions().size(); + for (size_t i = 1; i < tables_count; ++i) + { + QueryTreeNodePtr join_table_expression = cross_join_node->getTableExpressions()[i]; + + auto subquery_node = getSubqueryFromTableExpression(join_table_expression, column_source_to_columns, planner_context->getQueryContext()); + + auto temporary_table_expression_node = executeSubqueryNode(subquery_node, + planner_context->getMutableQueryContext(), + global_in_or_join_node.subquery_depth); + temporary_table_expression_node->setAlias(join_table_expression->getAlias()); + + replacement_map.emplace(join_table_expression.get(), std::move(temporary_table_expression_node)); + } + continue; + } if (auto * in_function_node = global_in_or_join_node.query_node->as()) { auto & in_function_subquery_node = in_function_node->getArguments().getNodes().at(1); @@ -738,7 +768,8 @@ class RewriteJoinToGlobalJoinVisitor : public InDepthQueryTreeVisitorWithContext { if (auto * join_node = node->as()) { - bool prefer_local_join = getContext()->getSettingsRef()[Setting::parallel_replicas_prefer_local_join]; + bool prefer_local_join = getContext()->getSettingsRef()[Setting::parallel_replicas_prefer_local_join] + && getContext()->getSettingsRef()[Setting::object_storage_cluster_join_mode] != ObjectStorageClusterJoinMode::GLOBAL; bool should_use_global_join = !prefer_local_join || !allStoragesAreMergeTree(join_node->getRightTableExpression()); if (should_use_global_join) join_node->setLocality(JoinLocality::Global); diff --git a/src/Storages/buildQueryTreeForShard.h b/src/Storages/buildQueryTreeForShard.h index 90cbfd36f660..bcbac10b55e0 100644 --- a/src/Storages/buildQueryTreeForShard.h +++ b/src/Storages/buildQueryTreeForShard.h @@ -16,7 +16,11 @@ using PlannerContextPtr = std::shared_ptr; class Context; using ContextPtr = std::shared_ptr; -QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_context, QueryTreeNodePtr query_tree_to_modify, bool allow_global_join_for_right_table); +QueryTreeNodePtr buildQueryTreeForShard( + const PlannerContextPtr & planner_context, + QueryTreeNodePtr query_tree_to_modify, + bool allow_global_join_for_right_table, + bool find_cross_join = false); void rewriteJoinToGlobalJoin(QueryTreeNodePtr query_tree_to_modify, ContextPtr context); diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index b990c0709f08..107e233e9e79 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -140,7 +140,7 @@ def started_cluster(): yield cluster finally: - shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated/")) + shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated/"), ignore_errors=True) cluster.shutdown() @@ -1171,7 +1171,8 @@ def test_hive_partitioning(started_cluster, allow_experimental_analyzer): node.query("SET allow_experimental_analyzer = DEFAULT") -def test_joins(started_cluster): +@pytest.mark.parametrize("join_mode", ["local", "global"]) +def test_joins(started_cluster, join_mode): node = started_cluster.instances["s0_0_0"] # Table join_table only exists on the node 's0_0_0'. @@ -1205,7 +1206,7 @@ def test_joins(started_cluster): join_table AS t2 ON t1.value = t2.id ORDER BY t1.name - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) @@ -1228,7 +1229,7 @@ def test_joins(started_cluster): 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 ON t1.value = t2.id ORDER BY t1.name - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) @@ -1246,7 +1247,7 @@ def test_joins(started_cluster): ON t1.value = t2.id WHERE (t1.value % 2) ORDER BY t1.name - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) @@ -1265,7 +1266,7 @@ def test_joins(started_cluster): ON t1.value = t2.id WHERE (t2.id % 2) ORDER BY t1.name - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) @@ -1283,27 +1284,29 @@ def test_joins(started_cluster): ON t1.value = t2.id WHERE (t1.value % 2) AND ((t2.id % 3) == 2) ORDER BY t1.name - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) res = list(map(str.split, result5.splitlines())) assert len(res) == 6 + # With WHERE clause with global subquery result6 = node.query( f""" SELECT name FROM s3Cluster('cluster_simple', 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') - WHERE value IN (SELECT id FROM join_table) + WHERE value GLOBAL IN (SELECT id FROM join_table) ORDER BY name - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) res = list(map(str.split, result6.splitlines())) assert len(res) == 25 + # With WHERE clause without columns in condition result7 = node.query( f""" SELECT count() FROM @@ -1314,11 +1317,12 @@ def test_joins(started_cluster): join_table AS t2 ON 1 GROUP BY ALL - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) assert result7.strip() == "625" + # With WHERE clause without columns in condition and with local column in SELECT result8 = node.query( f""" SELECT count(), t2.id FROM @@ -1329,7 +1333,7 @@ def test_joins(started_cluster): join_table AS t2 ON 1 GROUP BY ALL - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) res = list(map(str.split, result8.splitlines())) From 3926fa338b27c4244d41ed03fd9c974c5b186bed Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 12 May 2026 19:10:58 +0200 Subject: [PATCH 2/5] Fix global IN --- src/Storages/IStorageCluster.cpp | 4 ++ src/Storages/StorageDistributed.cpp | 52 +----------------- src/Storages/buildQueryTreeForShard.cpp | 53 +++++++++++++++++++ src/Storages/buildQueryTreeForShard.h | 1 + .../integration/test_database_iceberg/test.py | 13 ++--- tests/integration/test_s3_cluster/test.py | 15 ++++++ .../test_cluster_joins.py | 51 +++++++++++++++--- 7 files changed, 124 insertions(+), 65 deletions(-) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 13d597d7b0a5..c6bdbac7f621 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -320,6 +320,10 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( auto modified_query_tree = query_info.query_tree->clone(); rewriteJoinToGlobalJoin(modified_query_tree, context); + + if (info.has_local_columns_in_where) + rewriteInToGlobalIn(modified_query_tree, context); + modified_query_tree = buildQueryTreeForShard( query_info.planner_context, modified_query_tree, diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 3ee51f8bb1f3..d5a2d7ce2eb1 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -956,53 +956,6 @@ class ReplaceColumnNodesForTableExpressionVisitor : public InDepthQueryTreeVisit const ColumnNameToColumnNodeMap & column_name_to_node; }; -class RewriteInToGlobalInVisitor : public InDepthQueryTreeVisitorWithContext -{ -public: - using Base = InDepthQueryTreeVisitorWithContext; - using Base::Base; - - void enterImpl(QueryTreeNodePtr & node) - { - if (auto * function_node = node->as(); function_node && isNameOfLocalInFunction(function_node->getFunctionName())) - { - auto * query = function_node->getArguments().getNodes()[1]->as(); - if (!query) - return; - bool no_replace = true; - for (const auto & table_node : extractTableExpressions(query->getJoinTree(), false, true)) - { - const StorageDistributed * storage_distributed = nullptr; - if (const TableNode * table_node_typed = table_node->as()) - storage_distributed = typeid_cast(table_node_typed->getStorage().get()); - else if (const TableFunctionNode * table_function_node_typed = table_node->as()) - storage_distributed = typeid_cast(table_function_node_typed->getStorage().get()); - - if (!storage_distributed) - { - no_replace = false; - break; - } - } - if (no_replace) - return; - - auto result_function = std::make_shared(getGlobalInFunctionNameForLocalInFunctionName(function_node->getFunctionName())); - result_function->getArguments().getNodes() = std::move(function_node->getArguments().getNodes()); - resolveOrdinaryFunctionNodeByName(*result_function, result_function->getFunctionName(), getContext()); - node = result_function; - } - } - - static bool needChildVisit(QueryTreeNodePtr & parent, QueryTreeNodePtr &) - { - if (auto * function_node = parent->as(); function_node && function_node->getFunctionName().starts_with("global")) - return false; - - return true; - } -}; - bool rewriteJoinToGlobalJoinIfNeeded(QueryTreeNodePtr join_tree) { bool rewrite = false; @@ -1163,10 +1116,7 @@ QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info, { auto & query_node = query_tree_to_modify->as(); if (query_node.hasWhere()) - { - RewriteInToGlobalInVisitor visitor(query_context); - visitor.visit(query_node.getWhere()); - } + rewriteInToGlobalIn(query_node.getWhere(), query_context); rewriteJoinToGlobalJoinIfNeeded(query_node.getJoinTree()); } diff --git a/src/Storages/buildQueryTreeForShard.cpp b/src/Storages/buildQueryTreeForShard.cpp index 168847f03cec..39fbc8648457 100644 --- a/src/Storages/buildQueryTreeForShard.cpp +++ b/src/Storages/buildQueryTreeForShard.cpp @@ -792,4 +792,57 @@ void rewriteJoinToGlobalJoin(QueryTreeNodePtr query_tree_to_modify, ContextPtr c visitor.visit(query_tree_to_modify); } +class RewriteInToGlobalInVisitor : public InDepthQueryTreeVisitorWithContext +{ +public: + using Base = InDepthQueryTreeVisitorWithContext; + using Base::Base; + + void enterImpl(QueryTreeNodePtr & node) + { + if (auto * function_node = node->as(); function_node && isNameOfLocalInFunction(function_node->getFunctionName())) + { + auto * query = function_node->getArguments().getNodes()[1]->as(); + if (!query) + return; + bool no_replace = true; + for (const auto & table_node : extractTableExpressions(query->getJoinTree(), false, true)) + { + const StorageDistributed * storage_distributed = nullptr; + if (const TableNode * table_node_typed = table_node->as()) + storage_distributed = typeid_cast(table_node_typed->getStorage().get()); + else if (const TableFunctionNode * table_function_node_typed = table_node->as()) + storage_distributed = typeid_cast(table_function_node_typed->getStorage().get()); + + if (!storage_distributed) + { + no_replace = false; + break; + } + } + if (no_replace) + return; + + auto result_function = std::make_shared(getGlobalInFunctionNameForLocalInFunctionName(function_node->getFunctionName())); + result_function->getArguments().getNodes() = std::move(function_node->getArguments().getNodes()); + resolveOrdinaryFunctionNodeByName(*result_function, result_function->getFunctionName(), getContext()); + node = result_function; + } + } + + static bool needChildVisit(QueryTreeNodePtr & parent, QueryTreeNodePtr &) + { + if (auto * function_node = parent->as(); function_node && function_node->getFunctionName().starts_with("global")) + return false; + + return true; + } +}; + +void rewriteInToGlobalIn(QueryTreeNodePtr query_tree_to_modify, ContextPtr context) +{ + RewriteInToGlobalInVisitor visitor(context); + visitor.visit(query_tree_to_modify); +} + } diff --git a/src/Storages/buildQueryTreeForShard.h b/src/Storages/buildQueryTreeForShard.h index bcbac10b55e0..7c4658a76cf1 100644 --- a/src/Storages/buildQueryTreeForShard.h +++ b/src/Storages/buildQueryTreeForShard.h @@ -23,5 +23,6 @@ QueryTreeNodePtr buildQueryTreeForShard( bool find_cross_join = false); void rewriteJoinToGlobalJoin(QueryTreeNodePtr query_tree_to_modify, ContextPtr context); +void rewriteInToGlobalIn(QueryTreeNodePtr query_tree_to_modify, ContextPtr context); } diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 167b7d22708c..d81a1be96aaa 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -913,7 +913,8 @@ def create_namespace(suffix): assert "is filtered by `namespaces` database parameter." in node.query_and_get_error(f"DROP TABLE {CATALOG_NAME}.`{namespace_prefix}alpha.a2.{table_name}`") -def test_cluster_joins(started_cluster): +@pytest.mark.parametrize("join_mode", ["local", "global"]) +def test_cluster_joins(started_cluster, join_mode): node = started_cluster.instances["node1"] test_ref = f"test_join_tables_{uuid.uuid4()}" @@ -981,7 +982,7 @@ def test_cluster_joins(started_cluster): ORDER BY ALL SETTINGS object_storage_cluster='cluster_simple', - object_storage_cluster_join_mode='local' + object_storage_cluster_join_mode='{join_mode}' """ ) @@ -998,7 +999,7 @@ def test_cluster_joins(started_cluster): ORDER BY ALL SETTINGS object_storage_cluster='cluster_simple', - object_storage_cluster_join_mode='local' + object_storage_cluster_join_mode='{join_mode}' """ ) @@ -1014,7 +1015,7 @@ def test_cluster_joins(started_cluster): ORDER BY ALL SETTINGS object_storage_cluster='cluster_simple', - object_storage_cluster_join_mode='local' + object_storage_cluster_join_mode='{join_mode}' """ ) @@ -1031,7 +1032,7 @@ def test_cluster_joins(started_cluster): ORDER BY ALL SETTINGS object_storage_cluster='cluster_simple', - object_storage_cluster_join_mode='local' + object_storage_cluster_join_mode='{join_mode}' """ ) @@ -1046,7 +1047,7 @@ def test_cluster_joins(started_cluster): ORDER BY ALL SETTINGS object_storage_cluster='cluster_simple', - object_storage_cluster_join_mode='local' + object_storage_cluster_join_mode='{join_mode}' """ ) diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 107e233e9e79..675b89acd007 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -1291,6 +1291,21 @@ def test_joins(started_cluster, join_mode): res = list(map(str.split, result5.splitlines())) assert len(res) == 6 + # With WHERE clause with global subquery + result6 = node.query( + f""" + SELECT name FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + WHERE value IN (SELECT id FROM join_table) + ORDER BY name + SETTINGS object_storage_cluster_join_mode='{join_mode}'; + """ + ) + res = list(map(str.split, result6.splitlines())) + assert len(res) == 25 + # With WHERE clause with global subquery result6 = node.query( f""" diff --git a/tests/integration/test_storage_iceberg_with_spark/test_cluster_joins.py b/tests/integration/test_storage_iceberg_with_spark/test_cluster_joins.py index c04940c1eeb8..7a7e43949c93 100644 --- a/tests/integration/test_storage_iceberg_with_spark/test_cluster_joins.py +++ b/tests/integration/test_storage_iceberg_with_spark/test_cluster_joins.py @@ -6,8 +6,9 @@ execute_spark_query_general, ) +@pytest.mark.parametrize("join_mode", ["local", "global"]) @pytest.mark.parametrize("storage_type", ["s3", "azure"]) -def test_cluster_joins(started_cluster_iceberg_with_spark, storage_type): +def test_cluster_joins(started_cluster_iceberg_with_spark, storage_type, join_mode): instance = started_cluster_iceberg_with_spark.instances["node1"] spark = started_cluster_iceberg_with_spark.spark_session TABLE_NAME = "test_cluster_joins_" + storage_type + "_" + get_uuid_str() @@ -81,7 +82,7 @@ def execute_spark_query(query: str, table_name): ORDER BY ALL SETTINGS object_storage_cluster='cluster_simple', - object_storage_cluster_join_mode='local' + object_storage_cluster_join_mode='{join_mode}' """ ) @@ -91,14 +92,31 @@ def execute_spark_query(query: str, table_name): f""" SELECT name FROM {creation_expression} - WHERE tag in ( + WHERE tag IN ( SELECT id FROM {creation_expression_2} ) ORDER BY ALL SETTINGS object_storage_cluster='cluster_simple', - object_storage_cluster_join_mode='local' + object_storage_cluster_join_mode='{join_mode}' + """ + ) + + assert res == "jack\njohn\n" + + res = instance.query( + f""" + SELECT name + FROM {creation_expression} + WHERE tag GLOBAL IN ( + SELECT id + FROM {creation_expression_2} + ) + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='{join_mode}' """ ) @@ -114,7 +132,7 @@ def execute_spark_query(query: str, table_name): ORDER BY ALL SETTINGS object_storage_cluster='cluster_simple', - object_storage_cluster_join_mode='local' + object_storage_cluster_join_mode='{join_mode}' """ ) @@ -124,14 +142,31 @@ def execute_spark_query(query: str, table_name): f""" SELECT name FROM {creation_expression} - WHERE tag in ( + WHERE tag IN ( + SELECT id + FROM `{TABLE_NAME_LOCAL}` + ) + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='{join_mode}' + """ + ) + + assert res == "jack\njohn\n" + + res = instance.query( + f""" + SELECT name + FROM {creation_expression} + WHERE tag GLOBAL IN ( SELECT id FROM `{TABLE_NAME_LOCAL}` ) ORDER BY ALL SETTINGS object_storage_cluster='cluster_simple', - object_storage_cluster_join_mode='local' + object_storage_cluster_join_mode='{join_mode}' """ ) @@ -146,7 +181,7 @@ def execute_spark_query(query: str, table_name): ORDER BY ALL SETTINGS object_storage_cluster='cluster_simple', - object_storage_cluster_join_mode='local' + object_storage_cluster_join_mode='{join_mode}' """ ) From fb33a04abbe34bc8b688d9b621618008e9d2fff8 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 12 May 2026 19:47:36 +0200 Subject: [PATCH 3/5] Fix cross join replacement --- src/Storages/buildQueryTreeForShard.cpp | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/Storages/buildQueryTreeForShard.cpp b/src/Storages/buildQueryTreeForShard.cpp index 39fbc8648457..076af289c292 100644 --- a/src/Storages/buildQueryTreeForShard.cpp +++ b/src/Storages/buildQueryTreeForShard.cpp @@ -653,6 +653,23 @@ QueryTreeNodePtr buildQueryTreeForShard( global_in_or_join_node.subquery_depth); temporary_table_expression_node->setAlias(join_table_expression->getAlias()); + std::vector descendants_to_map; + for (const auto & child : join_table_expression->getChildren()) + if (child) + descendants_to_map.push_back(child.get()); + + while (!descendants_to_map.empty()) + { + const auto * descendant = descendants_to_map.back(); + descendants_to_map.pop_back(); + + replacement_map.emplace(descendant, temporary_table_expression_node); + + for (const auto & child : descendant->getChildren()) + if (child) + descendants_to_map.push_back(child.get()); + } + replacement_map.emplace(join_table_expression.get(), std::move(temporary_table_expression_node)); } continue; From b63ad834244b4e1197f16fb757530ea185eeecc4 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 12 May 2026 20:03:41 +0200 Subject: [PATCH 4/5] Fix rewriteInToGlobalIn --- src/Storages/buildQueryTreeForShard.cpp | 2 +- src/Storages/buildQueryTreeForShard.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Storages/buildQueryTreeForShard.cpp b/src/Storages/buildQueryTreeForShard.cpp index 076af289c292..c2a86d86952b 100644 --- a/src/Storages/buildQueryTreeForShard.cpp +++ b/src/Storages/buildQueryTreeForShard.cpp @@ -856,7 +856,7 @@ class RewriteInToGlobalInVisitor : public InDepthQueryTreeVisitorWithContext Date: Fri, 15 May 2026 18:48:54 +0200 Subject: [PATCH 5/5] Fix cluster function global join with distributed table --- src/Storages/IStorageCluster.cpp | 2 +- src/Storages/buildQueryTreeForShard.cpp | 37 ++++++++----- src/Storages/buildQueryTreeForShard.h | 2 +- .../conftest.py | 3 + .../test_cluster_joins.py | 55 +++++++++++++++++++ 5 files changed, 83 insertions(+), 16 deletions(-) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index c6bdbac7f621..86cbadd7015d 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -322,7 +322,7 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( rewriteJoinToGlobalJoin(modified_query_tree, context); if (info.has_local_columns_in_where) - rewriteInToGlobalIn(modified_query_tree, context); + rewriteInToGlobalIn(modified_query_tree, context, /*rewrite_for_distributed*/ true); modified_query_tree = buildQueryTreeForShard( query_info.planner_context, diff --git a/src/Storages/buildQueryTreeForShard.cpp b/src/Storages/buildQueryTreeForShard.cpp index c2a86d86952b..b7aa5420c7f6 100644 --- a/src/Storages/buildQueryTreeForShard.cpp +++ b/src/Storages/buildQueryTreeForShard.cpp @@ -822,23 +822,26 @@ class RewriteInToGlobalInVisitor : public InDepthQueryTreeVisitorWithContextgetArguments().getNodes()[1]->as(); if (!query) return; - bool no_replace = true; - for (const auto & table_node : extractTableExpressions(query->getJoinTree(), false, true)) + if (!rewrite_for_distributed) { - const StorageDistributed * storage_distributed = nullptr; - if (const TableNode * table_node_typed = table_node->as()) - storage_distributed = typeid_cast(table_node_typed->getStorage().get()); - else if (const TableFunctionNode * table_function_node_typed = table_node->as()) - storage_distributed = typeid_cast(table_function_node_typed->getStorage().get()); - - if (!storage_distributed) + bool no_replace = true; + for (const auto & table_node : extractTableExpressions(query->getJoinTree(), false, true)) { - no_replace = false; - break; + const StorageDistributed * storage_distributed = nullptr; + if (const TableNode * table_node_typed = table_node->as()) + storage_distributed = typeid_cast(table_node_typed->getStorage().get()); + else if (const TableFunctionNode * table_function_node_typed = table_node->as()) + storage_distributed = typeid_cast(table_function_node_typed->getStorage().get()); + + if (!storage_distributed) + { + no_replace = false; + break; + } } + if (no_replace) + return; } - if (no_replace) - return; auto result_function = std::make_shared(getGlobalInFunctionNameForLocalInFunctionName(function_node->getFunctionName())); result_function->getArguments().getNodes() = std::move(function_node->getArguments().getNodes()); @@ -854,11 +857,17 @@ class RewriteInToGlobalInVisitor : public InDepthQueryTreeVisitorWithContext