Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1969,7 +1969,7 @@ ClickHouse applies this setting when the query contains the product of object st
Possible values:

- `local` — Replaces the database and table in the subquery with local ones for the destination server (shard), leaving the normal `IN`/`JOIN.`
- `global` — Unsupported for now. Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.`
- `global` — Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.` Right table executes first and is added to the secondary query as temporay table.
- `allow` — Default value. Allows the use of these types of subqueries.
)", 0) \
\
Expand Down
88 changes: 68 additions & 20 deletions src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Planner/Utils.h>
#include <Processors/Sources/RemoteSource.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <QueryPipeline/narrowPipe.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/buildQueryTreeForShard.h>
#include <Storages/StorageDistributed.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Poco/URI.h>
Expand Down Expand Up @@ -110,11 +112,14 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContext<SearcherVisito
using Base = InDepthQueryTreeVisitorWithContext<SearcherVisitor>;
using Base::Base;

explicit SearcherVisitor(std::unordered_set<QueryTreeNodeType> types_, ContextPtr context) : Base(context), types(types_) {}
explicit SearcherVisitor(std::unordered_set<QueryTreeNodeType> types_, size_t entry_, ContextPtr context)
: Base(context)
, types(types_)
, entry(entry_) {}

bool needChildVisit(QueryTreeNodePtr & /*parent*/, QueryTreeNodePtr & /*child*/)
{
return getSubqueryDepth() <= 2 && !passed_node;
return getSubqueryDepth() <= 2 && !passed_node && !current_entry;
}

void enterImpl(QueryTreeNodePtr & node)
Expand All @@ -125,13 +130,19 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContext<SearcherVisito
auto node_type = node->getNodeType();

if (types.contains(node_type))
passed_node = node;
{
++current_entry;
if (current_entry == entry)
passed_node = node;
}
}

QueryTreeNodePtr getNode() const { return passed_node; }

private:
std::unordered_set<QueryTreeNodeType> types;
size_t entry;
size_t current_entry = 0;
QueryTreeNodePtr passed_node;
};

Expand Down Expand Up @@ -198,33 +209,42 @@ Converts
localtable as t
ON s3.key == t.key

to
to (object_storage_cluster_join_mode='local')

SELECT s3.c1, s3.c2, s3.key
FROM
s3Cluster(...) AS s3

or (object_storage_cluster_join_mode='global')

SELECT s3.c1, s3.c2, t.c3
FROM
s3Cluster(...) as s3
JOIN
values('key UInt32, data String', (1, 'one'), (2, 'two'), ...) as t
ON s3.key == t.key
*/
void IStorageCluster::updateQueryWithJoinToSendIfNeeded(
ASTPtr & query_to_send,
QueryTreeNodePtr query_tree,
SelectQueryInfo query_info,
const ContextPtr & context)
{
auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode];
switch (object_storage_cluster_join_mode)
{
case ObjectStorageClusterJoinMode::LOCAL:
{
auto info = getQueryTreeInfo(query_tree, context);
auto info = getQueryTreeInfo(query_info.query_tree, context);

if (info.has_join || info.has_cross_join || info.has_local_columns_in_where)
{
auto modified_query_tree = query_tree->clone();
auto modified_query_tree = query_info.query_tree->clone();

SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context);
SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, 1, context);
left_table_expression_searcher.visit(modified_query_tree);
auto table_function_node = left_table_expression_searcher.getNode();
if (!table_function_node)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find left table function node");

QueryTreeNodePtr query_tree_distributed;

Expand All @@ -237,7 +257,7 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded(
}
else if (info.has_cross_join)
{
SearcherVisitor join_searcher({QueryTreeNodeType::CROSS_JOIN}, context);
SearcherVisitor join_searcher({QueryTreeNodeType::CROSS_JOIN}, 1, context);
join_searcher.visit(modified_query_tree);
auto cross_join_node = join_searcher.getNode();
if (!cross_join_node)
Expand Down Expand Up @@ -292,8 +312,28 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded(
return;
}
case ObjectStorageClusterJoinMode::GLOBAL:
// TODO
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "`Global` mode for `object_storage_cluster_join_mode` setting is unimplemented for now");
{
auto info = getQueryTreeInfo(query_info.query_tree, context);

if (info.has_join || info.has_cross_join || info.has_local_columns_in_where)
{
auto modified_query_tree = query_info.query_tree->clone();

rewriteJoinToGlobalJoin(modified_query_tree, context);

if (info.has_local_columns_in_where)
rewriteInToGlobalIn(modified_query_tree, context);

modified_query_tree = buildQueryTreeForShard(
Comment on lines +318 to +327
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Rewrite non-GLOBAL IN predicates in global join mode

When object_storage_cluster_join_mode='global', this branch also runs for has_local_columns_in_where, but the rewrite path only calls rewriteJoinToGlobalJoin (JOIN-only) before buildQueryTreeForShard. For queries like ... WHERE value IN (SELECT id FROM local_table) (no JOIN/CROSS JOIN), the IN remains non-GLOBAL, so the subquery still executes on shards and fails if local_table exists only on the initiator. This is a correctness regression in the newly added global mode path because it now claims to handle local-column WHERE cases but does not transform plain IN subqueries.

Useful? React with 👍 / 👎.

query_info.planner_context,
modified_query_tree,
/*allow_global_join_for_right_table*/ false,
/*find_cross_join*/ true);
query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree);
}

return;
}
case ObjectStorageClusterJoinMode::ALLOW: // Do nothing special
return;
}
Expand Down Expand Up @@ -327,7 +367,7 @@ void IStorageCluster::read(
/// rewrite query to execute `remote('remote_host', s3(...))`
/// remote_host can execute query itself or make on-cluster query depends on own `object_storage_cluster` setting
updateConfigurationIfNeeded(context);
updateQueryWithJoinToSendIfNeeded(query_to_send, query_info.query_tree, context);
updateQueryWithJoinToSendIfNeeded(query_to_send, query_info, context);
updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context, /*make_cluster_function*/ false);

auto remote_initiator_cluster_name = settings[Setting::object_storage_remote_initiator_cluster].value;
Expand Down Expand Up @@ -356,7 +396,7 @@ void IStorageCluster::read(

SharedHeader sample_block;

updateQueryWithJoinToSendIfNeeded(query_to_send, query_info.query_tree, context);
updateQueryWithJoinToSendIfNeeded(query_to_send, query_info, context);

if (settings[Setting::allow_experimental_analyzer])
{
Expand Down Expand Up @@ -408,6 +448,10 @@ void IStorageCluster::read(

auto this_ptr = std::static_pointer_cast<IStorageCluster>(shared_from_this());

std::optional<Tables> external_tables = std::nullopt;
if (query_info.planner_context && query_info.planner_context->getMutableQueryContext())
external_tables = query_info.planner_context->getMutableQueryContext()->getExternalTables();

auto reading = std::make_unique<ReadFromCluster>(
column_names,
query_info,
Expand All @@ -418,7 +462,8 @@ void IStorageCluster::read(
std::move(query_to_send),
processed_stage,
cluster,
log);
log,
external_tables);

query_plan.addStep(std::move(reading));
}
Expand Down Expand Up @@ -558,7 +603,7 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const
new_context,
/*throttler=*/nullptr,
scalars,
Tables(),
external_tables.has_value() ? *external_tables : Tables(),
processed_stage,
nullptr,
RemoteQueryExecutor::Extension{.task_iterator = extension->task_iterator, .replica_info = std::move(replica_info)},
Expand Down Expand Up @@ -597,7 +642,7 @@ IStorageCluster::QueryTreeInfo IStorageCluster::getQueryTreeInfo(QueryTreeNodePt
info.has_cross_join = true;
}

SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context);
SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, 1, context);
left_table_expression_searcher.visit(query_tree);
auto table_function_node = left_table_expression_searcher.getNode();
if (!table_function_node)
Expand Down Expand Up @@ -632,9 +677,12 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage(
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true");

auto info = getQueryTreeInfo(query_info.query_tree, context);
if (info.has_join || info.has_cross_join || info.has_local_columns_in_where)
return QueryProcessingStage::Enum::FetchColumns;
if (object_storage_cluster_join_mode == ObjectStorageClusterJoinMode::LOCAL)
{
auto info = getQueryTreeInfo(query_info.query_tree, context);
if (info.has_join || info.has_cross_join || info.has_local_columns_in_where)
return QueryProcessingStage::Enum::FetchColumns;
}
}

/// Initiator executes query on remote node.
Expand Down
7 changes: 5 additions & 2 deletions src/Storages/IStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class IStorageCluster : public IStorage
const StorageSnapshotPtr & /*storage_snapshot*/,
const ContextPtr & /*context*/,
bool /*make_cluster_function*/) {}
void updateQueryWithJoinToSendIfNeeded(ASTPtr & query_to_send, QueryTreeNodePtr query_tree, const ContextPtr & context);
void updateQueryWithJoinToSendIfNeeded(ASTPtr & query_to_send, SelectQueryInfo query_info, const ContextPtr & context);

virtual void updateConfigurationIfNeeded(ContextPtr /* context */) {}

Expand Down Expand Up @@ -141,7 +141,8 @@ class ReadFromCluster : public SourceStepWithFilter
ASTPtr query_to_send_,
QueryProcessingStage::Enum processed_stage_,
ClusterPtr cluster_,
LoggerPtr log_)
LoggerPtr log_,
std::optional<Tables> external_tables_)
: SourceStepWithFilter(
std::move(sample_block),
column_names_,
Expand All @@ -153,6 +154,7 @@ class ReadFromCluster : public SourceStepWithFilter
, processed_stage(processed_stage_)
, cluster(std::move(cluster_))
, log(log_)
, external_tables(external_tables_)
{
}

Expand All @@ -164,6 +166,7 @@ class ReadFromCluster : public SourceStepWithFilter
LoggerPtr log;

std::optional<RemoteQueryExecutor::Extension> extension;
std::optional<Tables> external_tables;

void createExtension(const ActionsDAG::Node * predicate);
ContextPtr updateSettings(const Settings & settings);
Expand Down
52 changes: 1 addition & 51 deletions src/Storages/StorageDistributed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -956,53 +956,6 @@ class ReplaceColumnNodesForTableExpressionVisitor : public InDepthQueryTreeVisit
const ColumnNameToColumnNodeMap & column_name_to_node;
};

class RewriteInToGlobalInVisitor : public InDepthQueryTreeVisitorWithContext<RewriteInToGlobalInVisitor>
{
public:
using Base = InDepthQueryTreeVisitorWithContext<RewriteInToGlobalInVisitor>;
using Base::Base;

void enterImpl(QueryTreeNodePtr & node)
{
if (auto * function_node = node->as<FunctionNode>(); function_node && isNameOfLocalInFunction(function_node->getFunctionName()))
{
auto * query = function_node->getArguments().getNodes()[1]->as<QueryNode>();
if (!query)
return;
bool no_replace = true;
for (const auto & table_node : extractTableExpressions(query->getJoinTree(), false, true))
{
const StorageDistributed * storage_distributed = nullptr;
if (const TableNode * table_node_typed = table_node->as<TableNode>())
storage_distributed = typeid_cast<const StorageDistributed *>(table_node_typed->getStorage().get());
else if (const TableFunctionNode * table_function_node_typed = table_node->as<TableFunctionNode>())
storage_distributed = typeid_cast<const StorageDistributed *>(table_function_node_typed->getStorage().get());

if (!storage_distributed)
{
no_replace = false;
break;
}
}
if (no_replace)
return;

auto result_function = std::make_shared<FunctionNode>(getGlobalInFunctionNameForLocalInFunctionName(function_node->getFunctionName()));
result_function->getArguments().getNodes() = std::move(function_node->getArguments().getNodes());
resolveOrdinaryFunctionNodeByName(*result_function, result_function->getFunctionName(), getContext());
node = result_function;
}
}

static bool needChildVisit(QueryTreeNodePtr & parent, QueryTreeNodePtr &)
{
if (auto * function_node = parent->as<FunctionNode>(); function_node && function_node->getFunctionName().starts_with("global"))
return false;

return true;
}
};

bool rewriteJoinToGlobalJoinIfNeeded(QueryTreeNodePtr join_tree)
{
bool rewrite = false;
Expand Down Expand Up @@ -1163,10 +1116,7 @@ QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info,
{
auto & query_node = query_tree_to_modify->as<QueryNode&>();
if (query_node.hasWhere())
{
RewriteInToGlobalInVisitor visitor(query_context);
visitor.visit(query_node.getWhere());
}
rewriteInToGlobalIn(query_node.getWhere(), query_context);

rewriteJoinToGlobalJoinIfNeeded(query_node.getJoinTree());
}
Expand Down
Loading
Loading