diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index c9994afb205e..2e8dc20af363 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -1969,7 +1969,7 @@ ClickHouse applies this setting when the query contains the product of object st Possible values: - `local` — Replaces the database and table in the subquery with local ones for the destination server (shard), leaving the normal `IN`/`JOIN.` -- `global` — Unsupported for now. Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.` +- `global` — Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.` Right table executes first and is added to the secondary query as temporay table. - `allow` — Default value. Allows the use of these types of subqueries. )", 0) \ \ diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 7ae4d85d92f2..86cbadd7015d 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -17,12 +17,14 @@ #include #include #include +#include #include #include #include #include #include #include +#include #include #include #include @@ -110,11 +112,14 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContext; using Base::Base; - explicit SearcherVisitor(std::unordered_set types_, ContextPtr context) : Base(context), types(types_) {} + explicit SearcherVisitor(std::unordered_set types_, size_t entry_, ContextPtr context) + : Base(context) + , types(types_) + , entry(entry_) {} bool needChildVisit(QueryTreeNodePtr & /*parent*/, QueryTreeNodePtr & /*child*/) { - return getSubqueryDepth() <= 2 && !passed_node; + return getSubqueryDepth() <= 2 && !passed_node && !current_entry; } void enterImpl(QueryTreeNodePtr & node) @@ -125,13 +130,19 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContextgetNodeType(); if (types.contains(node_type)) - passed_node = node; + { + ++current_entry; + if (current_entry == entry) + passed_node = node; + } } QueryTreeNodePtr getNode() const { return passed_node; } private: std::unordered_set types; + size_t entry; + size_t current_entry = 0; QueryTreeNodePtr passed_node; }; @@ -198,15 +209,24 @@ Converts localtable as t ON s3.key == t.key -to +to (object_storage_cluster_join_mode='local') SELECT s3.c1, s3.c2, s3.key FROM s3Cluster(...) AS s3 + +or (object_storage_cluster_join_mode='global') + + SELECT s3.c1, s3.c2, t.c3 + FROM + s3Cluster(...) as s3 + JOIN + values('key UInt32, data String', (1, 'one'), (2, 'two'), ...) as t + ON s3.key == t.key */ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( ASTPtr & query_to_send, - QueryTreeNodePtr query_tree, + SelectQueryInfo query_info, const ContextPtr & context) { auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode]; @@ -214,17 +234,17 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( { case ObjectStorageClusterJoinMode::LOCAL: { - auto info = getQueryTreeInfo(query_tree, context); + auto info = getQueryTreeInfo(query_info.query_tree, context); if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) { - auto modified_query_tree = query_tree->clone(); + auto modified_query_tree = query_info.query_tree->clone(); - SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); + SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, 1, context); left_table_expression_searcher.visit(modified_query_tree); auto table_function_node = left_table_expression_searcher.getNode(); if (!table_function_node) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find left table function node"); QueryTreeNodePtr query_tree_distributed; @@ -237,7 +257,7 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( } else if (info.has_cross_join) { - SearcherVisitor join_searcher({QueryTreeNodeType::CROSS_JOIN}, context); + SearcherVisitor join_searcher({QueryTreeNodeType::CROSS_JOIN}, 1, context); join_searcher.visit(modified_query_tree); auto cross_join_node = join_searcher.getNode(); if (!cross_join_node) @@ -292,8 +312,28 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( return; } case ObjectStorageClusterJoinMode::GLOBAL: - // TODO - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "`Global` mode for `object_storage_cluster_join_mode` setting is unimplemented for now"); + { + auto info = getQueryTreeInfo(query_info.query_tree, context); + + if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) + { + auto modified_query_tree = query_info.query_tree->clone(); + + rewriteJoinToGlobalJoin(modified_query_tree, context); + + if (info.has_local_columns_in_where) + rewriteInToGlobalIn(modified_query_tree, context, /*rewrite_for_distributed*/ true); + + modified_query_tree = buildQueryTreeForShard( + query_info.planner_context, + modified_query_tree, + /*allow_global_join_for_right_table*/ false, + /*find_cross_join*/ true); + query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree); + } + + return; + } case ObjectStorageClusterJoinMode::ALLOW: // Do nothing special return; } @@ -327,7 +367,7 @@ void IStorageCluster::read( /// rewrite query to execute `remote('remote_host', s3(...))` /// remote_host can execute query itself or make on-cluster query depends on own `object_storage_cluster` setting updateConfigurationIfNeeded(context); - updateQueryWithJoinToSendIfNeeded(query_to_send, query_info.query_tree, context); + updateQueryWithJoinToSendIfNeeded(query_to_send, query_info, context); updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context, /*make_cluster_function*/ false); auto remote_initiator_cluster_name = settings[Setting::object_storage_remote_initiator_cluster].value; @@ -356,7 +396,7 @@ void IStorageCluster::read( SharedHeader sample_block; - updateQueryWithJoinToSendIfNeeded(query_to_send, query_info.query_tree, context); + updateQueryWithJoinToSendIfNeeded(query_to_send, query_info, context); if (settings[Setting::allow_experimental_analyzer]) { @@ -408,6 +448,10 @@ void IStorageCluster::read( auto this_ptr = std::static_pointer_cast(shared_from_this()); + std::optional external_tables = std::nullopt; + if (query_info.planner_context && query_info.planner_context->getMutableQueryContext()) + external_tables = query_info.planner_context->getMutableQueryContext()->getExternalTables(); + auto reading = std::make_unique( column_names, query_info, @@ -418,7 +462,8 @@ void IStorageCluster::read( std::move(query_to_send), processed_stage, cluster, - log); + log, + external_tables); query_plan.addStep(std::move(reading)); } @@ -558,7 +603,7 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const new_context, /*throttler=*/nullptr, scalars, - Tables(), + external_tables.has_value() ? *external_tables : Tables(), processed_stage, nullptr, RemoteQueryExecutor::Extension{.task_iterator = extension->task_iterator, .replica_info = std::move(replica_info)}, @@ -597,7 +642,7 @@ IStorageCluster::QueryTreeInfo IStorageCluster::getQueryTreeInfo(QueryTreeNodePt info.has_cross_join = true; } - SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); + SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, 1, context); left_table_expression_searcher.visit(query_tree); auto table_function_node = left_table_expression_searcher.getNode(); if (!table_function_node) @@ -632,9 +677,12 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( throw Exception(ErrorCodes::NOT_IMPLEMENTED, "object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true"); - auto info = getQueryTreeInfo(query_info.query_tree, context); - if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) - return QueryProcessingStage::Enum::FetchColumns; + if (object_storage_cluster_join_mode == ObjectStorageClusterJoinMode::LOCAL) + { + auto info = getQueryTreeInfo(query_info.query_tree, context); + if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) + return QueryProcessingStage::Enum::FetchColumns; + } } /// Initiator executes query on remote node. diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 2c0c6d3c6029..6bc8e2a8ada4 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -67,7 +67,7 @@ class IStorageCluster : public IStorage const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/, bool /*make_cluster_function*/) {} - void updateQueryWithJoinToSendIfNeeded(ASTPtr & query_to_send, QueryTreeNodePtr query_tree, const ContextPtr & context); + void updateQueryWithJoinToSendIfNeeded(ASTPtr & query_to_send, SelectQueryInfo query_info, const ContextPtr & context); virtual void updateConfigurationIfNeeded(ContextPtr /* context */) {} @@ -141,7 +141,8 @@ class ReadFromCluster : public SourceStepWithFilter ASTPtr query_to_send_, QueryProcessingStage::Enum processed_stage_, ClusterPtr cluster_, - LoggerPtr log_) + LoggerPtr log_, + std::optional external_tables_) : SourceStepWithFilter( std::move(sample_block), column_names_, @@ -153,6 +154,7 @@ class ReadFromCluster : public SourceStepWithFilter , processed_stage(processed_stage_) , cluster(std::move(cluster_)) , log(log_) + , external_tables(external_tables_) { } @@ -164,6 +166,7 @@ class ReadFromCluster : public SourceStepWithFilter LoggerPtr log; std::optional extension; + std::optional external_tables; void createExtension(const ActionsDAG::Node * predicate); ContextPtr updateSettings(const Settings & settings); diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 3ee51f8bb1f3..d5a2d7ce2eb1 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -956,53 +956,6 @@ class ReplaceColumnNodesForTableExpressionVisitor : public InDepthQueryTreeVisit const ColumnNameToColumnNodeMap & column_name_to_node; }; -class RewriteInToGlobalInVisitor : public InDepthQueryTreeVisitorWithContext -{ -public: - using Base = InDepthQueryTreeVisitorWithContext; - using Base::Base; - - void enterImpl(QueryTreeNodePtr & node) - { - if (auto * function_node = node->as(); function_node && isNameOfLocalInFunction(function_node->getFunctionName())) - { - auto * query = function_node->getArguments().getNodes()[1]->as(); - if (!query) - return; - bool no_replace = true; - for (const auto & table_node : extractTableExpressions(query->getJoinTree(), false, true)) - { - const StorageDistributed * storage_distributed = nullptr; - if (const TableNode * table_node_typed = table_node->as()) - storage_distributed = typeid_cast(table_node_typed->getStorage().get()); - else if (const TableFunctionNode * table_function_node_typed = table_node->as()) - storage_distributed = typeid_cast(table_function_node_typed->getStorage().get()); - - if (!storage_distributed) - { - no_replace = false; - break; - } - } - if (no_replace) - return; - - auto result_function = std::make_shared(getGlobalInFunctionNameForLocalInFunctionName(function_node->getFunctionName())); - result_function->getArguments().getNodes() = std::move(function_node->getArguments().getNodes()); - resolveOrdinaryFunctionNodeByName(*result_function, result_function->getFunctionName(), getContext()); - node = result_function; - } - } - - static bool needChildVisit(QueryTreeNodePtr & parent, QueryTreeNodePtr &) - { - if (auto * function_node = parent->as(); function_node && function_node->getFunctionName().starts_with("global")) - return false; - - return true; - } -}; - bool rewriteJoinToGlobalJoinIfNeeded(QueryTreeNodePtr join_tree) { bool rewrite = false; @@ -1163,10 +1116,7 @@ QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info, { auto & query_node = query_tree_to_modify->as(); if (query_node.hasWhere()) - { - RewriteInToGlobalInVisitor visitor(query_context); - visitor.visit(query_node.getWhere()); - } + rewriteInToGlobalIn(query_node.getWhere(), query_context); rewriteJoinToGlobalJoinIfNeeded(query_node.getJoinTree()); } diff --git a/src/Storages/buildQueryTreeForShard.cpp b/src/Storages/buildQueryTreeForShard.cpp index efb7d426b4fe..b7aa5420c7f6 100644 --- a/src/Storages/buildQueryTreeForShard.cpp +++ b/src/Storages/buildQueryTreeForShard.cpp @@ -43,6 +43,7 @@ namespace Setting extern const SettingsBool prefer_global_in_and_join; extern const SettingsBool enable_add_distinct_to_in_subqueries; extern const SettingsInt64 optimize_const_name_size; + extern const SettingsObjectStorageClusterJoinMode object_storage_cluster_join_mode; } namespace ErrorCodes @@ -121,8 +122,9 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito using Base = InDepthQueryTreeVisitorWithContext; using Base::Base; - explicit DistributedProductModeRewriteInJoinVisitor(const ContextPtr & context_) + explicit DistributedProductModeRewriteInJoinVisitor(const ContextPtr & context_, bool find_cross_join_) : Base(context_) + , find_cross_join(find_cross_join_) {} struct InFunctionOrJoin @@ -158,9 +160,11 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito { auto * function_node = node->as(); auto * join_node = node->as(); + CrossJoinNode * cross_join_node = find_cross_join ? node->as() : nullptr; if ((function_node && isNameOfGlobalInFunction(function_node->getFunctionName())) || - (join_node && join_node->getLocality() == JoinLocality::Global)) + (join_node && join_node->getLocality() == JoinLocality::Global) || + cross_join_node) { InFunctionOrJoin in_function_or_join_entry; in_function_or_join_entry.query_node = node; @@ -224,7 +228,9 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito replacement_table_expression->setTableExpressionModifiers(*table_expression_modifiers); replacement_map.emplace(table_node.get(), std::move(replacement_table_expression)); } - else if ((distributed_product_mode == DistributedProductMode::GLOBAL || getSettings()[Setting::prefer_global_in_and_join]) && + else if ((distributed_product_mode == DistributedProductMode::GLOBAL || + getSettings()[Setting::prefer_global_in_and_join] || + (find_cross_join && getSettings()[Setting::object_storage_cluster_join_mode] == ObjectStorageClusterJoinMode::GLOBAL)) && !in_function_or_join_stack.empty()) { auto * in_or_join_node_to_modify = in_function_or_join_stack.back().query_node.get(); @@ -258,6 +264,8 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito std::vector in_function_or_join_stack; std::unordered_map replacement_map; std::vector global_in_or_join_nodes; + + bool find_cross_join = false; }; /** Replaces large constant values with `__getScalar` function calls to avoid @@ -558,14 +566,18 @@ QueryTreeNodePtr getSubqueryFromTableExpression( } -QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_context, QueryTreeNodePtr query_tree_to_modify, bool allow_global_join_for_right_table) +QueryTreeNodePtr buildQueryTreeForShard( + const PlannerContextPtr & planner_context, + QueryTreeNodePtr query_tree_to_modify, + bool allow_global_join_for_right_table, + bool find_cross_join) { CollectColumnSourceToColumnsVisitor collect_column_source_to_columns_visitor; collect_column_source_to_columns_visitor.visit(query_tree_to_modify); const auto & column_source_to_columns = collect_column_source_to_columns_visitor.getColumnSourceToColumns(); - DistributedProductModeRewriteInJoinVisitor visitor(planner_context->getQueryContext()); + DistributedProductModeRewriteInJoinVisitor visitor(planner_context->getQueryContext(), find_cross_join); visitor.visit(query_tree_to_modify); auto replacement_map = visitor.getReplacementMap(); @@ -627,6 +639,41 @@ QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_contex replacement_map.emplace(join_table_expression.get(), std::move(temporary_table_expression_node)); continue; } + if (auto * cross_join_node = global_in_or_join_node.query_node->as()) + { + auto tables_count = cross_join_node->getTableExpressions().size(); + for (size_t i = 1; i < tables_count; ++i) + { + QueryTreeNodePtr join_table_expression = cross_join_node->getTableExpressions()[i]; + + auto subquery_node = getSubqueryFromTableExpression(join_table_expression, column_source_to_columns, planner_context->getQueryContext()); + + auto temporary_table_expression_node = executeSubqueryNode(subquery_node, + planner_context->getMutableQueryContext(), + global_in_or_join_node.subquery_depth); + temporary_table_expression_node->setAlias(join_table_expression->getAlias()); + + std::vector descendants_to_map; + for (const auto & child : join_table_expression->getChildren()) + if (child) + descendants_to_map.push_back(child.get()); + + while (!descendants_to_map.empty()) + { + const auto * descendant = descendants_to_map.back(); + descendants_to_map.pop_back(); + + replacement_map.emplace(descendant, temporary_table_expression_node); + + for (const auto & child : descendant->getChildren()) + if (child) + descendants_to_map.push_back(child.get()); + } + + replacement_map.emplace(join_table_expression.get(), std::move(temporary_table_expression_node)); + } + continue; + } if (auto * in_function_node = global_in_or_join_node.query_node->as()) { auto & in_function_subquery_node = in_function_node->getArguments().getNodes().at(1); @@ -738,7 +785,8 @@ class RewriteJoinToGlobalJoinVisitor : public InDepthQueryTreeVisitorWithContext { if (auto * join_node = node->as()) { - bool prefer_local_join = getContext()->getSettingsRef()[Setting::parallel_replicas_prefer_local_join]; + bool prefer_local_join = getContext()->getSettingsRef()[Setting::parallel_replicas_prefer_local_join] + && getContext()->getSettingsRef()[Setting::object_storage_cluster_join_mode] != ObjectStorageClusterJoinMode::GLOBAL; bool should_use_global_join = !prefer_local_join || !allStoragesAreMergeTree(join_node->getRightTableExpression()); if (should_use_global_join) join_node->setLocality(JoinLocality::Global); @@ -761,4 +809,66 @@ void rewriteJoinToGlobalJoin(QueryTreeNodePtr query_tree_to_modify, ContextPtr c visitor.visit(query_tree_to_modify); } +class RewriteInToGlobalInVisitor : public InDepthQueryTreeVisitorWithContext +{ +public: + using Base = InDepthQueryTreeVisitorWithContext; + using Base::Base; + + void enterImpl(QueryTreeNodePtr & node) + { + if (auto * function_node = node->as(); function_node && isNameOfLocalInFunction(function_node->getFunctionName())) + { + auto * query = function_node->getArguments().getNodes()[1]->as(); + if (!query) + return; + if (!rewrite_for_distributed) + { + bool no_replace = true; + for (const auto & table_node : extractTableExpressions(query->getJoinTree(), false, true)) + { + const StorageDistributed * storage_distributed = nullptr; + if (const TableNode * table_node_typed = table_node->as()) + storage_distributed = typeid_cast(table_node_typed->getStorage().get()); + else if (const TableFunctionNode * table_function_node_typed = table_node->as()) + storage_distributed = typeid_cast(table_function_node_typed->getStorage().get()); + + if (!storage_distributed) + { + no_replace = false; + break; + } + } + if (no_replace) + return; + } + + auto result_function = std::make_shared(getGlobalInFunctionNameForLocalInFunctionName(function_node->getFunctionName())); + result_function->getArguments().getNodes() = std::move(function_node->getArguments().getNodes()); + resolveOrdinaryFunctionNodeByName(*result_function, result_function->getFunctionName(), getContext()); + node = result_function; + } + } + + static bool needChildVisit(QueryTreeNodePtr & parent, QueryTreeNodePtr &) + { + if (auto * function_node = parent->as(); function_node && function_node->getFunctionName().starts_with("global")) + return false; + + return true; + } + + void setRewriteForDistributed(bool rewrite_for_distributed_) { rewrite_for_distributed = rewrite_for_distributed_; } + +private: + bool rewrite_for_distributed = false; +}; + +void rewriteInToGlobalIn(QueryTreeNodePtr & query_tree_to_modify, ContextPtr context, bool rewrite_for_distributed) +{ + RewriteInToGlobalInVisitor visitor(context); + visitor.setRewriteForDistributed(rewrite_for_distributed); + visitor.visit(query_tree_to_modify); +} + } diff --git a/src/Storages/buildQueryTreeForShard.h b/src/Storages/buildQueryTreeForShard.h index 90cbfd36f660..5d8134f4a8cc 100644 --- a/src/Storages/buildQueryTreeForShard.h +++ b/src/Storages/buildQueryTreeForShard.h @@ -16,8 +16,13 @@ using PlannerContextPtr = std::shared_ptr; class Context; using ContextPtr = std::shared_ptr; -QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_context, QueryTreeNodePtr query_tree_to_modify, bool allow_global_join_for_right_table); +QueryTreeNodePtr buildQueryTreeForShard( + const PlannerContextPtr & planner_context, + QueryTreeNodePtr query_tree_to_modify, + bool allow_global_join_for_right_table, + bool find_cross_join = false); void rewriteJoinToGlobalJoin(QueryTreeNodePtr query_tree_to_modify, ContextPtr context); +void rewriteInToGlobalIn(QueryTreeNodePtr & query_tree_to_modify, ContextPtr context, bool rewrite_for_distributed = false); } diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 167b7d22708c..d81a1be96aaa 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -913,7 +913,8 @@ def create_namespace(suffix): assert "is filtered by `namespaces` database parameter." in node.query_and_get_error(f"DROP TABLE {CATALOG_NAME}.`{namespace_prefix}alpha.a2.{table_name}`") -def test_cluster_joins(started_cluster): +@pytest.mark.parametrize("join_mode", ["local", "global"]) +def test_cluster_joins(started_cluster, join_mode): node = started_cluster.instances["node1"] test_ref = f"test_join_tables_{uuid.uuid4()}" @@ -981,7 +982,7 @@ def test_cluster_joins(started_cluster): ORDER BY ALL SETTINGS object_storage_cluster='cluster_simple', - object_storage_cluster_join_mode='local' + object_storage_cluster_join_mode='{join_mode}' """ ) @@ -998,7 +999,7 @@ def test_cluster_joins(started_cluster): ORDER BY ALL SETTINGS object_storage_cluster='cluster_simple', - object_storage_cluster_join_mode='local' + object_storage_cluster_join_mode='{join_mode}' """ ) @@ -1014,7 +1015,7 @@ def test_cluster_joins(started_cluster): ORDER BY ALL SETTINGS object_storage_cluster='cluster_simple', - object_storage_cluster_join_mode='local' + object_storage_cluster_join_mode='{join_mode}' """ ) @@ -1031,7 +1032,7 @@ def test_cluster_joins(started_cluster): ORDER BY ALL SETTINGS object_storage_cluster='cluster_simple', - object_storage_cluster_join_mode='local' + object_storage_cluster_join_mode='{join_mode}' """ ) @@ -1046,7 +1047,7 @@ def test_cluster_joins(started_cluster): ORDER BY ALL SETTINGS object_storage_cluster='cluster_simple', - object_storage_cluster_join_mode='local' + object_storage_cluster_join_mode='{join_mode}' """ ) diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index b990c0709f08..675b89acd007 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -140,7 +140,7 @@ def started_cluster(): yield cluster finally: - shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated/")) + shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated/"), ignore_errors=True) cluster.shutdown() @@ -1171,7 +1171,8 @@ def test_hive_partitioning(started_cluster, allow_experimental_analyzer): node.query("SET allow_experimental_analyzer = DEFAULT") -def test_joins(started_cluster): +@pytest.mark.parametrize("join_mode", ["local", "global"]) +def test_joins(started_cluster, join_mode): node = started_cluster.instances["s0_0_0"] # Table join_table only exists on the node 's0_0_0'. @@ -1205,7 +1206,7 @@ def test_joins(started_cluster): join_table AS t2 ON t1.value = t2.id ORDER BY t1.name - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) @@ -1228,7 +1229,7 @@ def test_joins(started_cluster): 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 ON t1.value = t2.id ORDER BY t1.name - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) @@ -1246,7 +1247,7 @@ def test_joins(started_cluster): ON t1.value = t2.id WHERE (t1.value % 2) ORDER BY t1.name - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) @@ -1265,7 +1266,7 @@ def test_joins(started_cluster): ON t1.value = t2.id WHERE (t2.id % 2) ORDER BY t1.name - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) @@ -1283,13 +1284,14 @@ def test_joins(started_cluster): ON t1.value = t2.id WHERE (t1.value % 2) AND ((t2.id % 3) == 2) ORDER BY t1.name - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) res = list(map(str.split, result5.splitlines())) assert len(res) == 6 + # With WHERE clause with global subquery result6 = node.query( f""" SELECT name FROM @@ -1298,12 +1300,28 @@ def test_joins(started_cluster): 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') WHERE value IN (SELECT id FROM join_table) ORDER BY name - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) res = list(map(str.split, result6.splitlines())) assert len(res) == 25 + # With WHERE clause with global subquery + result6 = node.query( + f""" + SELECT name FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + WHERE value GLOBAL IN (SELECT id FROM join_table) + ORDER BY name + SETTINGS object_storage_cluster_join_mode='{join_mode}'; + """ + ) + res = list(map(str.split, result6.splitlines())) + assert len(res) == 25 + + # With WHERE clause without columns in condition result7 = node.query( f""" SELECT count() FROM @@ -1314,11 +1332,12 @@ def test_joins(started_cluster): join_table AS t2 ON 1 GROUP BY ALL - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) assert result7.strip() == "625" + # With WHERE clause without columns in condition and with local column in SELECT result8 = node.query( f""" SELECT count(), t2.id FROM @@ -1329,7 +1348,7 @@ def test_joins(started_cluster): join_table AS t2 ON 1 GROUP BY ALL - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) res = list(map(str.split, result8.splitlines())) diff --git a/tests/integration/test_storage_iceberg_with_spark/conftest.py b/tests/integration/test_storage_iceberg_with_spark/conftest.py index bbd3883b33f1..08cdda37e98e 100644 --- a/tests/integration/test_storage_iceberg_with_spark/conftest.py +++ b/tests/integration/test_storage_iceberg_with_spark/conftest.py @@ -81,6 +81,7 @@ def started_cluster_iceberg_with_spark(): with_minio=True, with_azurite=True, stay_alive=True, + with_zookeeper=True, ) cluster.add_instance( "node2", @@ -93,6 +94,7 @@ def started_cluster_iceberg_with_spark(): ], user_configs=["configs/users.d/users.xml"], stay_alive=True, + with_zookeeper=True, ) cluster.add_instance( "node3", @@ -105,6 +107,7 @@ def started_cluster_iceberg_with_spark(): ], user_configs=["configs/users.d/users.xml"], stay_alive=True, + with_zookeeper=True, ) logging.info("Starting cluster...") diff --git a/tests/integration/test_storage_iceberg_with_spark/test_cluster_joins.py b/tests/integration/test_storage_iceberg_with_spark/test_cluster_joins.py index c04940c1eeb8..82e9d6c3c572 100644 --- a/tests/integration/test_storage_iceberg_with_spark/test_cluster_joins.py +++ b/tests/integration/test_storage_iceberg_with_spark/test_cluster_joins.py @@ -6,13 +6,16 @@ execute_spark_query_general, ) +@pytest.mark.parametrize("join_mode", ["local", "global"]) @pytest.mark.parametrize("storage_type", ["s3", "azure"]) -def test_cluster_joins(started_cluster_iceberg_with_spark, storage_type): +def test_cluster_joins(started_cluster_iceberg_with_spark, storage_type, join_mode): instance = started_cluster_iceberg_with_spark.instances["node1"] spark = started_cluster_iceberg_with_spark.spark_session TABLE_NAME = "test_cluster_joins_" + storage_type + "_" + get_uuid_str() TABLE_NAME_2 = "test_cluster_joins_2_" + storage_type + "_" + get_uuid_str() TABLE_NAME_LOCAL = "test_cluster_joins_local_" + storage_type + "_" + get_uuid_str() + TABLE_NAME_SOURCE = "test_cluster_joins_source_" + storage_type + "_" + get_uuid_str() + TABLE_NAME_DISTRIBUTED = "test_cluster_joins_distributed_" + storage_type + "_" + get_uuid_str() def execute_spark_query(query: str, table_name): return execute_spark_query_general( @@ -72,6 +75,10 @@ def execute_spark_query(query: str, table_name): instance.query(f"CREATE TABLE `{TABLE_NAME_LOCAL}` (id Int64, second_name String) ENGINE = Memory()") instance.query(f"INSERT INTO `{TABLE_NAME_LOCAL}` VALUES (1, 'silver'), (2, 'black')") + instance.query(f"CREATE TABLE `{TABLE_NAME_SOURCE}` ON CLUSTER 'cluster_simple' (id Int64, second_name String) ENGINE = Memory()") + instance.query(f"CREATE TABLE `{TABLE_NAME_DISTRIBUTED}` (id Int64, second_name String) ENGINE = Distributed('cluster_simple', currentDatabase(), '{TABLE_NAME_SOURCE}')") + instance.query(f"INSERT INTO `{TABLE_NAME_DISTRIBUTED}` VALUES (1, 'smith'), (2, 'wesson')") + res = instance.query( f""" SELECT t1.name,t2.second_name @@ -81,7 +88,7 @@ def execute_spark_query(query: str, table_name): ORDER BY ALL SETTINGS object_storage_cluster='cluster_simple', - object_storage_cluster_join_mode='local' + object_storage_cluster_join_mode='{join_mode}' """ ) @@ -91,14 +98,31 @@ def execute_spark_query(query: str, table_name): f""" SELECT name FROM {creation_expression} - WHERE tag in ( + WHERE tag IN ( + SELECT id + FROM {creation_expression_2} + ) + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='{join_mode}' + """ + ) + + assert res == "jack\njohn\n" + + res = instance.query( + f""" + SELECT name + FROM {creation_expression} + WHERE tag GLOBAL IN ( SELECT id FROM {creation_expression_2} ) ORDER BY ALL SETTINGS object_storage_cluster='cluster_simple', - object_storage_cluster_join_mode='local' + object_storage_cluster_join_mode='{join_mode}' """ ) @@ -114,7 +138,7 @@ def execute_spark_query(query: str, table_name): ORDER BY ALL SETTINGS object_storage_cluster='cluster_simple', - object_storage_cluster_join_mode='local' + object_storage_cluster_join_mode='{join_mode}' """ ) @@ -124,14 +148,31 @@ def execute_spark_query(query: str, table_name): f""" SELECT name FROM {creation_expression} - WHERE tag in ( + WHERE tag IN ( + SELECT id + FROM `{TABLE_NAME_LOCAL}` + ) + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='{join_mode}' + """ + ) + + assert res == "jack\njohn\n" + + res = instance.query( + f""" + SELECT name + FROM {creation_expression} + WHERE tag GLOBAL IN ( SELECT id FROM `{TABLE_NAME_LOCAL}` ) ORDER BY ALL SETTINGS object_storage_cluster='cluster_simple', - object_storage_cluster_join_mode='local' + object_storage_cluster_join_mode='{join_mode}' """ ) @@ -146,8 +187,57 @@ def execute_spark_query(query: str, table_name): ORDER BY ALL SETTINGS object_storage_cluster='cluster_simple', - object_storage_cluster_join_mode='local' + object_storage_cluster_join_mode='{join_mode}' """ ) assert res == "jack\tblack\njack\tsilver\njohn\tblack\njohn\tsilver\n" + + res = instance.query( + f""" + SELECT t1.name,t2.second_name + FROM {creation_expression} AS t1 + JOIN `{TABLE_NAME_DISTRIBUTED}` AS t2 + ON t1.tag=t2.id + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='{join_mode}' + """ + ) + + assert res == "jack\twesson\njohn\tsmith\n" + + res = instance.query( + f""" + SELECT name + FROM {creation_expression} + WHERE tag GLOBAL IN ( + SELECT id + FROM `{TABLE_NAME_DISTRIBUTED}` + ) + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='{join_mode}' + """ + ) + + assert res == "jack\njohn\n" + + res = instance.query( + f""" + SELECT name + FROM {creation_expression} + WHERE tag IN ( + SELECT id + FROM `{TABLE_NAME_DISTRIBUTED}` + ) + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='{join_mode}' + """ + ) + + assert res == "jack\njohn\n"