Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/Storages/MergeTree/ExportPartTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,31 @@ namespace
}
}

/// Mirrors `InterpreterInsertQuery::addInsertToSelectPipeline`: positional match,
/// destination header = `getSampleBlockNonMaterialized()`, all type bridging is done
/// by the CAST inside `makeConvertingActions`. No pre-validation, no per-column
/// lossy/non-lossy classification — restrictions are exactly what INSERT SELECT enforces.
void addExportConvertingActions(
QueryPlan & plan_for_part,
const IStorage & destination_storage,
const ContextPtr & local_context)
{
const auto destination_header
= destination_storage.getInMemoryMetadataPtr()->getSampleBlockNonMaterialized();

auto dag = ActionsDAG::makeConvertingActions(
plan_for_part.getCurrentHeader()->getColumnsWithTypeAndName(),
destination_header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Position,
local_context);

auto expression_step = std::make_unique<ExpressionStep>(
plan_for_part.getCurrentHeader(),
std::move(dag));
expression_step->setStepDescription("Convert source columns to destination types for export");
plan_for_part.addStep(std::move(expression_step));
}

String buildDestinationFilename(
const MergeTreePartExportManifest & manifest,
const StorageID & storage_id,
Expand Down Expand Up @@ -239,6 +264,10 @@ bool ExportPartTask::executeStep()
/// This is a hack that materializes the columns before the export so they can be exported to tables that have matching columns
materializeSpecialColumns(plan_for_part.getCurrentHeader(), metadata_snapshot, local_context, plan_for_part);

/// Align the pipeline header with the destination's non-materialized sample block,
/// using the same `makeConvertingActions(Position)` call INSERT SELECT performs.
addExportConvertingActions(plan_for_part, *destination_storage, local_context);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Convert partition values before creating the import sink

When the destination is partitioned object storage and the partition column type changes in a castable way (for example source year String, destination year UInt32, both PARTITION BY year), this new row conversion happens only after destination_storage->import has already computed the S3/Hive partition path from block_with_partition_values using the source part's minmax block. That means a schema pair now accepted by the synchronous validation can either fail inside partition-key evaluation with the destination actions seeing source-typed inputs, or write files under a path based on the uncast source value while the exported rows were cast, so the export is not equivalent to INSERT SELECT.

Useful? React with 👍 / 👎.


QueryPlanOptimizationSettings optimization_settings(local_context);
auto pipeline_settings = BuildQueryPipelineSettings(local_context);
auto builder = plan_for_part.buildQueryPipeline(optimization_settings, pipeline_settings);
Expand Down
23 changes: 17 additions & 6 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <Common/thread_local_rng.h>
#include <Storages/MergeTree/ExportPartTask.h>
#include <Storages/MergeTree/ExportPartitionUtils.h>
#include <Interpreters/ActionsDAG.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
Expand Down Expand Up @@ -6741,14 +6742,24 @@ void MergeTreeData::exportPartToTable(
#endif
}

const auto & source_columns = source_metadata_ptr->getColumns();
/// Schema compatibility: use the same rules as `INSERT INTO dest SELECT * FROM src` —
/// positional matching with CAST. We surface structural mismatches synchronously at
/// ALTER TABLE time by building (and discarding) the same converting DAG the worker
/// will build later. Anything `makeConvertingActions` rejects (column-count mismatch,
/// untyped cast) fails here; anything it accepts will be CAST at runtime.
{
Block source_sample_block;
for (const auto & column : source_metadata_ptr->getColumns().getReadable())
source_sample_block.insert({column.type->createColumn(), column.type, column.name});

const auto & destination_columns = destination_metadata_ptr->getColumns();
const auto destination_sample_block = destination_metadata_ptr->getSampleBlockNonMaterialized();

/// compare all source readable columns with all destination insertable columns
/// this allows us to skip ephemeral columns
if (source_columns.getReadable().sizeOfDifference(destination_columns.getInsertable()))
throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Tables have different structure");
(void) ActionsDAG::makeConvertingActions(
source_sample_block.getColumnsWithTypeAndName(),
destination_sample_block.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Position,
query_context);
}

/// for data lakes this check is performed differently. It is a bit more complex as we need to convert the iceberg partition spec
/// to the MergeTree partition spec and compare the two.
Expand Down
23 changes: 19 additions & 4 deletions src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
#include <Storages/MergeTree/PatchParts/PatchPartsLock.h>
#include <Storages/MergeTree/PatchParts/PatchPartsUtils.h>
#include <Storages/MergeTree/ExportPartitionUtils.h>
#include <Interpreters/ActionsDAG.h>

#include <Databases/DatabaseOnDisk.h>
#include <Databases/DatabaseReplicated.h>
Expand Down Expand Up @@ -8316,10 +8317,24 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand &
auto src_snapshot = getInMemoryMetadataPtr();
auto destination_snapshot = dest_storage->getInMemoryMetadataPtr();

/// compare all source readable columns with all destination insertable columns
/// this allows us to skip ephemeral columns
if (src_snapshot->getColumns().getReadable().sizeOfDifference(destination_snapshot->getColumns().getInsertable()))
throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Tables have different structure");
/// Schema compatibility: use the same rules as `INSERT INTO dest SELECT * FROM src` —
/// positional matching with CAST. We surface structural mismatches synchronously at
/// ALTER TABLE time by building (and discarding) the same converting DAG the worker
/// will build later. Anything `makeConvertingActions` rejects (column-count mismatch,
/// untyped cast) fails here; anything it accepts will be CAST at runtime.
{
Block source_sample_block;
for (const auto & column : src_snapshot->getColumns().getReadable())
source_sample_block.insert({column.type->createColumn(), column.type, column.name});

const auto destination_sample_block = destination_snapshot->getSampleBlockNonMaterialized();

(void) ActionsDAG::makeConvertingActions(
source_sample_block.getColumnsWithTypeAndName(),
destination_sample_block.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Position,
query_context);
}

/// for data lakes this check is performed later. It is a bit more complex as we need to convert the iceberg partition spec
/// to the MergeTree partition spec and compare the two.
Expand Down
263 changes: 263 additions & 0 deletions tests/integration/test_export_merge_tree_part_to_iceberg/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,42 @@ def assert_part_log(node, table: str, part: str) -> None:
)


def wait_for_failed_export_part(
node,
table: str,
part: str,
timeout: int = 60,
poll_interval: float = 0.5,
) -> str:
"""Poll system.part_log until a failed ExportPart event appears for *part*.

Returns the exception text recorded on the log entry, which lets callers
assert on the specific runtime error that propagated from the export worker.
"""
deadline = time.time() + timeout
last_seen = ""
while time.time() < deadline:
node.query("SYSTEM FLUSH LOGS")
row = node.query(
f"SELECT error, exception FROM system.part_log "
f"WHERE event_type = 'ExportPart' "
f"AND database = currentDatabase() "
f"AND table = '{table}' "
f"AND part_name = '{part}' "
f"AND error != 0 "
f"ORDER BY event_time DESC LIMIT 1"
).strip()
if row:
_error, exception = row.split("\t", 1)
return exception
last_seen = row
time.sleep(poll_interval)
raise TimeoutError(
f"Failed ExportPart event for part {part!r} in table {table!r} "
f"did not appear in system.part_log within {timeout}s (last row: {last_seen!r})"
)


# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -479,3 +515,230 @@ def test_export_part_writes_column_statistics(cluster):

node.query(f"DROP TABLE IF EXISTS {mt} SYNC")
node.query(f"DROP TABLE IF EXISTS {iceberg}")


# ---------------------------------------------------------------------------
# Schema compatibility (INSERT SELECT mirror)
#
# EXPORT PART builds an ActionsDAG via makeConvertingActions(Position) — the
# same primitive that powers INSERT INTO dest SELECT * FROM src. The tests
# below pin that contract:
#
# * Column counts must match positionally; otherwise the ALTER is rejected
# synchronously with NUMBER_OF_COLUMNS_DOESNT_MATCH and nothing lands.
# * Destination column names need not match source names — positional pairs
# are CAST element-wise.
# * Castable type differences are accepted at validation time (widening and
# even lossy narrowing). Lossiness only matters at runtime: if the value
# does not fit the destination type, the async export worker fails and the
# failure surfaces via system.part_log; the Iceberg table is untouched.
# ---------------------------------------------------------------------------


def test_export_part_column_count_mismatch_source_more_is_rejected(cluster):
"""
Source has 3 columns (id, year, extra), destination has 2 (id, year).
The ALTER must be rejected synchronously with NUMBER_OF_COLUMNS_DOESNT_MATCH
and the Iceberg table must remain empty.
"""
node = cluster.instances["node1"]
sfx = unique_suffix()
mt = f"mt_count_more_{sfx}"
iceberg = f"iceberg_count_more_{sfx}"

make_mt(node, mt, "id Int32, year Int32, extra String", "year")
make_iceberg_s3(node, iceberg, "id Int32, year Int32", "year")

node.query(f"INSERT INTO {mt} VALUES (1, 2020, 'foo'), (2, 2020, 'bar')")
part_2020 = get_part(node, mt, "2020")

error = node.query_and_get_error(
f"ALTER TABLE {mt} EXPORT PART '{part_2020}' TO TABLE {iceberg} "
f"SETTINGS allow_experimental_export_merge_tree_part = 1, "
f"allow_experimental_insert_into_iceberg = 1"
)
assert "NUMBER_OF_COLUMNS_DOESNT_MATCH" in error, (
f"Expected NUMBER_OF_COLUMNS_DOESNT_MATCH for source>dest column count, "
f"got: {error!r}"
)

node.query(f"DROP TABLE IF EXISTS {mt} SYNC")
node.query(f"DROP TABLE IF EXISTS {iceberg}")


def test_export_part_column_count_mismatch_source_fewer_is_rejected(cluster):
"""
Source has 2 columns (id, year), destination has 3 (id, year, extra).
Same expected synchronous rejection as the source>dest case.
"""
node = cluster.instances["node1"]
sfx = unique_suffix()
mt = f"mt_count_fewer_{sfx}"
iceberg = f"iceberg_count_fewer_{sfx}"

make_mt(node, mt, "id Int32, year Int32", "year")
make_iceberg_s3(node, iceberg, "id Int32, year Int32, extra String", "year")

node.query(f"INSERT INTO {mt} VALUES (1, 2020), (2, 2020)")
part_2020 = get_part(node, mt, "2020")

error = node.query_and_get_error(
f"ALTER TABLE {mt} EXPORT PART '{part_2020}' TO TABLE {iceberg} "
f"SETTINGS allow_experimental_export_merge_tree_part = 1, "
f"allow_experimental_insert_into_iceberg = 1"
)
assert "NUMBER_OF_COLUMNS_DOESNT_MATCH" in error, (
f"Expected NUMBER_OF_COLUMNS_DOESNT_MATCH for source<dest column count, "
f"got: {error!r}"
)

node.query(f"DROP TABLE IF EXISTS {mt} SYNC")
node.query(f"DROP TABLE IF EXISTS {iceberg}")


def test_export_part_with_renamed_destination_column(cluster):
"""
Source has column `id`, destination has the same shape but the column is
named `renamed_id`. Positional matching must accept the export and the
data must land in the destination under the new name.
"""
node = cluster.instances["node1"]
sfx = unique_suffix()
mt = f"mt_renamed_{sfx}"
iceberg = f"iceberg_renamed_{sfx}"

make_mt(node, mt, "id Int32, year Int32", "year")
make_iceberg_s3(node, iceberg, "renamed_id Int32, year Int32", "year")

node.query(f"INSERT INTO {mt} VALUES (1, 2020), (2, 2020), (3, 2020)")
part_2020 = get_part(node, mt, "2020")

export_part(node, mt, part_2020, iceberg)
wait_for_export_part(node, mt, part_2020)

count = int(node.query(f"SELECT count() FROM {iceberg}").strip())
assert count == 3, f"Expected 3 rows in Iceberg table after export, got {count}"

result = node.query(
f"SELECT renamed_id, year FROM {iceberg} ORDER BY renamed_id"
).strip()
assert result == "1\t2020\n2\t2020\n3\t2020", (
f"Unexpected data under renamed column:\n{result}"
)

assert_part_log(node, mt, part_2020)

node.query(f"DROP TABLE IF EXISTS {mt} SYNC")
node.query(f"DROP TABLE IF EXISTS {iceberg}")


def test_export_part_with_castable_widening(cluster):
"""
Source column is Int32, destination expects Int64. makeConvertingActions
inserts a widening CAST; the export must succeed and the values must land
as Int64 in Iceberg.
"""
node = cluster.instances["node1"]
sfx = unique_suffix()
mt = f"mt_widen_{sfx}"
iceberg = f"iceberg_widen_{sfx}"

make_mt(node, mt, "id Int32, year Int32", "year")
make_iceberg_s3(node, iceberg, "id Int64, year Int32", "year")

node.query(f"INSERT INTO {mt} VALUES (1, 2020), (2, 2020)")
part_2020 = get_part(node, mt, "2020")

export_part(node, mt, part_2020, iceberg)
wait_for_export_part(node, mt, part_2020)

count = int(node.query(f"SELECT count() FROM {iceberg}").strip())
assert count == 2, f"Expected 2 rows in Iceberg table after export, got {count}"

result = node.query(
f"SELECT id, toTypeName(id), year FROM {iceberg} ORDER BY id"
).strip()
assert result == "1\tInt64\t2020\n2\tInt64\t2020", (
f"Unexpected widened data:\n{result}"
)

assert_part_log(node, mt, part_2020)

node.query(f"DROP TABLE IF EXISTS {mt} SYNC")
node.query(f"DROP TABLE IF EXISTS {iceberg}")


def test_export_part_with_castable_narrowing_values_fit(cluster):
"""
Source column is Int64, destination expects Int32 — lossy in general but
not blocked, mirroring INSERT SELECT.
"""
node = cluster.instances["node1"]
sfx = unique_suffix()
mt = f"mt_narrow_fit_{sfx}"
iceberg = f"iceberg_narrow_fit_{sfx}"

make_mt(node, mt, "id Int64, year Int32", "year")
make_iceberg_s3(node, iceberg, "id Int32, year Int32", "year")

node.query(f"INSERT INTO {mt} VALUES (1, 2020), (2, 2020)")
part_2020 = get_part(node, mt, "2020")

export_part(node, mt, part_2020, iceberg)
wait_for_export_part(node, mt, part_2020)

count = int(node.query(f"SELECT count() FROM {iceberg}").strip())
assert count == 2, f"Expected 2 rows in Iceberg table after export, got {count}"

result = node.query(
f"SELECT id, toTypeName(id), year FROM {iceberg} ORDER BY id"
).strip()
assert result == "1\tInt32\t2020\n2\tInt32\t2020", (
f"Unexpected narrowed data:\n{result}"
)

assert_part_log(node, mt, part_2020)

node.query(f"DROP TABLE IF EXISTS {mt} SYNC")
node.query(f"DROP TABLE IF EXISTS {iceberg}")


def test_export_part_runtime_cast_failure_propagates_async(cluster):
"""
Source has a String column whose value cannot be parsed as the destination
Int32 column. Validation accepts the type pair (String -> Int32 is a
defined conversion) so the ALTER returns synchronously; the actual cast
runs in the async export worker and must fail there. The failure is
propagated to system.part_log (non-zero `error` column with a non-empty
`exception` text) and the Iceberg table is left empty.

(Integer narrowing overflow is not used here because the internal cast
`makeConvertingActions` builds uses `CastType::nonAccurate`, which wraps
on overflow rather than throwing — that would still produce a successful
export with garbage data, not a propagated failure.)
"""
node = cluster.instances["node1"]
sfx = unique_suffix()
mt = f"mt_runtime_cast_fail_{sfx}"
iceberg = f"iceberg_runtime_cast_fail_{sfx}"

make_mt(node, mt, "id String, year Int32", "year")
make_iceberg_s3(node, iceberg, "id Int32, year Int32", "year")

node.query(f"INSERT INTO {mt} VALUES ('not a number', 2020)")
part_2020 = get_part(node, mt, "2020")

export_part(node, mt, part_2020, iceberg)

exception = wait_for_failed_export_part(node, mt, part_2020)
assert exception, (
f"Expected non-empty exception text on failed ExportPart entry, got {exception!r}"
)

count = int(node.query(f"SELECT count() FROM {iceberg}").strip())
assert count == 0, (
f"Expected 0 rows in Iceberg table after failed export, got {count}"
)

node.query(f"DROP TABLE IF EXISTS {mt} SYNC")
node.query(f"DROP TABLE IF EXISTS {iceberg}")
Loading
Loading