From 99002b9740d7420ac445bf51bdbac2edd17a9585 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Mon, 11 May 2026 16:37:40 -0300 Subject: [PATCH] cast like insert select --- src/Storages/MergeTree/ExportPartTask.cpp | 29 ++ src/Storages/MergeTree/MergeTreeData.cpp | 23 +- src/Storages/StorageReplicatedMergeTree.cpp | 23 +- .../test.py | 263 ++++++++++++++++++ ...rge_tree_part_to_object_storage_simple.sql | 9 +- ...rge_tree_part_to_object_storage_simple.sql | 4 +- 6 files changed, 337 insertions(+), 14 deletions(-) diff --git a/src/Storages/MergeTree/ExportPartTask.cpp b/src/Storages/MergeTree/ExportPartTask.cpp index 1014b9df7422..40cf0e269de1 100644 --- a/src/Storages/MergeTree/ExportPartTask.cpp +++ b/src/Storages/MergeTree/ExportPartTask.cpp @@ -99,6 +99,31 @@ namespace } } + /// Mirrors `InterpreterInsertQuery::addInsertToSelectPipeline`: positional match, + /// destination header = `getSampleBlockNonMaterialized()`, all type bridging is done + /// by the CAST inside `makeConvertingActions`. No pre-validation, no per-column + /// lossy/non-lossy classification — restrictions are exactly what INSERT SELECT enforces. + void addExportConvertingActions( + QueryPlan & plan_for_part, + const IStorage & destination_storage, + const ContextPtr & local_context) + { + const auto destination_header + = destination_storage.getInMemoryMetadataPtr()->getSampleBlockNonMaterialized(); + + auto dag = ActionsDAG::makeConvertingActions( + plan_for_part.getCurrentHeader()->getColumnsWithTypeAndName(), + destination_header.getColumnsWithTypeAndName(), + ActionsDAG::MatchColumnsMode::Position, + local_context); + + auto expression_step = std::make_unique( + plan_for_part.getCurrentHeader(), + std::move(dag)); + expression_step->setStepDescription("Convert source columns to destination types for export"); + plan_for_part.addStep(std::move(expression_step)); + } + String buildDestinationFilename( const MergeTreePartExportManifest & manifest, const StorageID & storage_id, @@ -239,6 +264,10 @@ bool ExportPartTask::executeStep() /// This is a hack that materializes the columns before the export so they can be exported to tables that have matching columns materializeSpecialColumns(plan_for_part.getCurrentHeader(), metadata_snapshot, local_context, plan_for_part); + /// Align the pipeline header with the destination's non-materialized sample block, + /// using the same `makeConvertingActions(Position)` call INSERT SELECT performs. + addExportConvertingActions(plan_for_part, *destination_storage, local_context); + QueryPlanOptimizationSettings optimization_settings(local_context); auto pipeline_settings = BuildQueryPipelineSettings(local_context); auto builder = plan_for_part.buildQueryPipeline(optimization_settings, pipeline_settings); diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 3e1e5e87d62e..a16ad82e26ba 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -6741,14 +6742,24 @@ void MergeTreeData::exportPartToTable( #endif } - const auto & source_columns = source_metadata_ptr->getColumns(); + /// Schema compatibility: use the same rules as `INSERT INTO dest SELECT * FROM src` — + /// positional matching with CAST. We surface structural mismatches synchronously at + /// ALTER TABLE time by building (and discarding) the same converting DAG the worker + /// will build later. Anything `makeConvertingActions` rejects (column-count mismatch, + /// untyped cast) fails here; anything it accepts will be CAST at runtime. + { + Block source_sample_block; + for (const auto & column : source_metadata_ptr->getColumns().getReadable()) + source_sample_block.insert({column.type->createColumn(), column.type, column.name}); - const auto & destination_columns = destination_metadata_ptr->getColumns(); + const auto destination_sample_block = destination_metadata_ptr->getSampleBlockNonMaterialized(); - /// compare all source readable columns with all destination insertable columns - /// this allows us to skip ephemeral columns - if (source_columns.getReadable().sizeOfDifference(destination_columns.getInsertable())) - throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Tables have different structure"); + (void) ActionsDAG::makeConvertingActions( + source_sample_block.getColumnsWithTypeAndName(), + destination_sample_block.getColumnsWithTypeAndName(), + ActionsDAG::MatchColumnsMode::Position, + query_context); + } /// for data lakes this check is performed differently. It is a bit more complex as we need to convert the iceberg partition spec /// to the MergeTree partition spec and compare the two. diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index ee4e7811378a..d6bb75352c5e 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -79,6 +79,7 @@ #include #include #include +#include #include #include @@ -8316,10 +8317,24 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & auto src_snapshot = getInMemoryMetadataPtr(); auto destination_snapshot = dest_storage->getInMemoryMetadataPtr(); - /// compare all source readable columns with all destination insertable columns - /// this allows us to skip ephemeral columns - if (src_snapshot->getColumns().getReadable().sizeOfDifference(destination_snapshot->getColumns().getInsertable())) - throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Tables have different structure"); + /// Schema compatibility: use the same rules as `INSERT INTO dest SELECT * FROM src` — + /// positional matching with CAST. We surface structural mismatches synchronously at + /// ALTER TABLE time by building (and discarding) the same converting DAG the worker + /// will build later. Anything `makeConvertingActions` rejects (column-count mismatch, + /// untyped cast) fails here; anything it accepts will be CAST at runtime. + { + Block source_sample_block; + for (const auto & column : src_snapshot->getColumns().getReadable()) + source_sample_block.insert({column.type->createColumn(), column.type, column.name}); + + const auto destination_sample_block = destination_snapshot->getSampleBlockNonMaterialized(); + + (void) ActionsDAG::makeConvertingActions( + source_sample_block.getColumnsWithTypeAndName(), + destination_sample_block.getColumnsWithTypeAndName(), + ActionsDAG::MatchColumnsMode::Position, + query_context); + } /// for data lakes this check is performed later. It is a bit more complex as we need to convert the iceberg partition spec /// to the MergeTree partition spec and compare the two. diff --git a/tests/integration/test_export_merge_tree_part_to_iceberg/test.py b/tests/integration/test_export_merge_tree_part_to_iceberg/test.py index b371727a08cd..9dc3b2f8434e 100644 --- a/tests/integration/test_export_merge_tree_part_to_iceberg/test.py +++ b/tests/integration/test_export_merge_tree_part_to_iceberg/test.py @@ -121,6 +121,42 @@ def assert_part_log(node, table: str, part: str) -> None: ) +def wait_for_failed_export_part( + node, + table: str, + part: str, + timeout: int = 60, + poll_interval: float = 0.5, +) -> str: + """Poll system.part_log until a failed ExportPart event appears for *part*. + + Returns the exception text recorded on the log entry, which lets callers + assert on the specific runtime error that propagated from the export worker. + """ + deadline = time.time() + timeout + last_seen = "" + while time.time() < deadline: + node.query("SYSTEM FLUSH LOGS") + row = node.query( + f"SELECT error, exception FROM system.part_log " + f"WHERE event_type = 'ExportPart' " + f"AND database = currentDatabase() " + f"AND table = '{table}' " + f"AND part_name = '{part}' " + f"AND error != 0 " + f"ORDER BY event_time DESC LIMIT 1" + ).strip() + if row: + _error, exception = row.split("\t", 1) + return exception + last_seen = row + time.sleep(poll_interval) + raise TimeoutError( + f"Failed ExportPart event for part {part!r} in table {table!r} " + f"did not appear in system.part_log within {timeout}s (last row: {last_seen!r})" + ) + + # --------------------------------------------------------------------------- # Tests # --------------------------------------------------------------------------- @@ -479,3 +515,230 @@ def test_export_part_writes_column_statistics(cluster): node.query(f"DROP TABLE IF EXISTS {mt} SYNC") node.query(f"DROP TABLE IF EXISTS {iceberg}") + + +# --------------------------------------------------------------------------- +# Schema compatibility (INSERT SELECT mirror) +# +# EXPORT PART builds an ActionsDAG via makeConvertingActions(Position) — the +# same primitive that powers INSERT INTO dest SELECT * FROM src. The tests +# below pin that contract: +# +# * Column counts must match positionally; otherwise the ALTER is rejected +# synchronously with NUMBER_OF_COLUMNS_DOESNT_MATCH and nothing lands. +# * Destination column names need not match source names — positional pairs +# are CAST element-wise. +# * Castable type differences are accepted at validation time (widening and +# even lossy narrowing). Lossiness only matters at runtime: if the value +# does not fit the destination type, the async export worker fails and the +# failure surfaces via system.part_log; the Iceberg table is untouched. +# --------------------------------------------------------------------------- + + +def test_export_part_column_count_mismatch_source_more_is_rejected(cluster): + """ + Source has 3 columns (id, year, extra), destination has 2 (id, year). + The ALTER must be rejected synchronously with NUMBER_OF_COLUMNS_DOESNT_MATCH + and the Iceberg table must remain empty. + """ + node = cluster.instances["node1"] + sfx = unique_suffix() + mt = f"mt_count_more_{sfx}" + iceberg = f"iceberg_count_more_{sfx}" + + make_mt(node, mt, "id Int32, year Int32, extra String", "year") + make_iceberg_s3(node, iceberg, "id Int32, year Int32", "year") + + node.query(f"INSERT INTO {mt} VALUES (1, 2020, 'foo'), (2, 2020, 'bar')") + part_2020 = get_part(node, mt, "2020") + + error = node.query_and_get_error( + f"ALTER TABLE {mt} EXPORT PART '{part_2020}' TO TABLE {iceberg} " + f"SETTINGS allow_experimental_export_merge_tree_part = 1, " + f"allow_experimental_insert_into_iceberg = 1" + ) + assert "NUMBER_OF_COLUMNS_DOESNT_MATCH" in error, ( + f"Expected NUMBER_OF_COLUMNS_DOESNT_MATCH for source>dest column count, " + f"got: {error!r}" + ) + + node.query(f"DROP TABLE IF EXISTS {mt} SYNC") + node.query(f"DROP TABLE IF EXISTS {iceberg}") + + +def test_export_part_column_count_mismatch_source_fewer_is_rejected(cluster): + """ + Source has 2 columns (id, year), destination has 3 (id, year, extra). + Same expected synchronous rejection as the source>dest case. + """ + node = cluster.instances["node1"] + sfx = unique_suffix() + mt = f"mt_count_fewer_{sfx}" + iceberg = f"iceberg_count_fewer_{sfx}" + + make_mt(node, mt, "id Int32, year Int32", "year") + make_iceberg_s3(node, iceberg, "id Int32, year Int32, extra String", "year") + + node.query(f"INSERT INTO {mt} VALUES (1, 2020), (2, 2020)") + part_2020 = get_part(node, mt, "2020") + + error = node.query_and_get_error( + f"ALTER TABLE {mt} EXPORT PART '{part_2020}' TO TABLE {iceberg} " + f"SETTINGS allow_experimental_export_merge_tree_part = 1, " + f"allow_experimental_insert_into_iceberg = 1" + ) + assert "NUMBER_OF_COLUMNS_DOESNT_MATCH" in error, ( + f"Expected NUMBER_OF_COLUMNS_DOESNT_MATCH for source Int32 is a + defined conversion) so the ALTER returns synchronously; the actual cast + runs in the async export worker and must fail there. The failure is + propagated to system.part_log (non-zero `error` column with a non-empty + `exception` text) and the Iceberg table is left empty. + + (Integer narrowing overflow is not used here because the internal cast + `makeConvertingActions` builds uses `CastType::nonAccurate`, which wraps + on overflow rather than throwing — that would still produce a successful + export with garbage data, not a propagated failure.) + """ + node = cluster.instances["node1"] + sfx = unique_suffix() + mt = f"mt_runtime_cast_fail_{sfx}" + iceberg = f"iceberg_runtime_cast_fail_{sfx}" + + make_mt(node, mt, "id String, year Int32", "year") + make_iceberg_s3(node, iceberg, "id Int32, year Int32", "year") + + node.query(f"INSERT INTO {mt} VALUES ('not a number', 2020)") + part_2020 = get_part(node, mt, "2020") + + export_part(node, mt, part_2020, iceberg) + + exception = wait_for_failed_export_part(node, mt, part_2020) + assert exception, ( + f"Expected non-empty exception text on failed ExportPart entry, got {exception!r}" + ) + + count = int(node.query(f"SELECT count() FROM {iceberg}").strip()) + assert count == 0, ( + f"Expected 0 rows in Iceberg table after failed export, got {count}" + ) + + node.query(f"DROP TABLE IF EXISTS {mt} SYNC") + node.query(f"DROP TABLE IF EXISTS {iceberg}") diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql index 7cb70af024a2..ceb83191f93a 100644 --- a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql @@ -9,10 +9,12 @@ CREATE TABLE 03572_mt_table (id UInt64, year UInt16) ENGINE = MergeTree() PARTIT INSERT INTO 03572_mt_table VALUES (1, 2020); -- Create a table with a different partition key and export a partition to it. It should throw +-- on the partition-key AST mismatch (schema compat now follows INSERT SELECT positional semantics, +-- so the column shape matches and the partition-key check is what fires). CREATE TABLE 03572_invalid_schema_table (id UInt64, x UInt16) ENGINE = S3(s3_conn, filename='03572_invalid_schema_table', format='Parquet', partition_strategy='hive') PARTITION BY x; ALTER TABLE 03572_mt_table EXPORT PART '2020_1_1_0' TO TABLE 03572_invalid_schema_table -SETTINGS allow_experimental_export_merge_tree_part = 1; -- {serverError INCOMPATIBLE_COLUMNS} +SETTINGS allow_experimental_export_merge_tree_part = 1; -- {serverError BAD_ARGUMENTS} DROP TABLE 03572_invalid_schema_table; @@ -27,13 +29,14 @@ ALTER TABLE 03572_mt_table EXPORT PART '2020_1_1_0' TO TABLE FUNCTION extractKey -- It is a table function, but the engine does not support exports/imports, should throw ALTER TABLE 03572_mt_table EXPORT PART '2020_1_1_0' TO TABLE FUNCTION url('a.parquet'); -- {serverError NOT_IMPLEMENTED} --- Test that destination table can not have a column that matches the source ephemeral +-- Source-side ephemeral columns are not readable, so the destination must not declare a matching +-- ordinary column or the column count will not align under positional matching. CREATE TABLE 03572_ephemeral_mt_table (id UInt64, year UInt16, name String EPHEMERAL) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple(); CREATE TABLE 03572_matching_ephemeral_s3_table (id UInt64, year UInt16, name String) ENGINE = S3(s3_conn, filename='03572_matching_ephemeral_s3_table', format='Parquet', partition_strategy='hive') PARTITION BY year; INSERT INTO 03572_ephemeral_mt_table (id, year, name) VALUES (1, 2020, 'alice'); -ALTER TABLE 03572_ephemeral_mt_table EXPORT PART '2020_1_1_0' TO TABLE 03572_matching_ephemeral_s3_table; -- {serverError INCOMPATIBLE_COLUMNS} +ALTER TABLE 03572_ephemeral_mt_table EXPORT PART '2020_1_1_0' TO TABLE 03572_matching_ephemeral_s3_table; -- {serverError NUMBER_OF_COLUMNS_DOESNT_MATCH} DROP TABLE IF EXISTS 03572_mt_table, 03572_invalid_schema_table, 03572_ephemeral_mt_table, 03572_matching_ephemeral_s3_table; diff --git a/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage_simple.sql b/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage_simple.sql index f8f23532f0a7..0f6647861e43 100644 --- a/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage_simple.sql +++ b/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage_simple.sql @@ -7,10 +7,12 @@ CREATE TABLE 03572_rmt_table (id UInt64, year UInt16) ENGINE = ReplicatedMergeTr INSERT INTO 03572_rmt_table VALUES (1, 2020); -- Create a table with a different partition key and export a partition to it. It should throw +-- on the partition-key AST mismatch (schema compat now follows INSERT SELECT positional semantics, +-- so the column shape matches and the partition-key check is what fires). CREATE TABLE 03572_invalid_schema_table (id UInt64, x UInt16) ENGINE = S3(s3_conn, filename='03572_invalid_schema_table', format='Parquet', partition_strategy='hive') PARTITION BY x; ALTER TABLE 03572_rmt_table EXPORT PART '2020_0_0_0' TO TABLE 03572_invalid_schema_table -SETTINGS allow_experimental_export_merge_tree_part = 1; -- {serverError INCOMPATIBLE_COLUMNS} +SETTINGS allow_experimental_export_merge_tree_part = 1; -- {serverError BAD_ARGUMENTS} DROP TABLE 03572_invalid_schema_table;