diff --git a/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py b/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py index af6ca75bafd7..67ff3f802001 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py @@ -932,3 +932,143 @@ def test_export_partition_writes_column_statistics(cluster): entries = fetch_manifest_entries(node, query_id) assert_exported_stats(entries) + + +def test_export_50_monthly_partitions_to_iceberg_two_replicas(cluster): + """ + Reproduction of the QA scenario reported against EXPORT PARTITION on a + two-replica ReplicatedMergeTree. Each partition is exported with its own + ALTER TABLE statement, which mirrors how QA drove the failure (every + statement reports success but the destination row count does not match). + + Schema mirrors QA: + - ``event_month`` is a MATERIALIZED Int32 derived from ``event_time``, + and is the partition expression. + - Wide row layout with a SHA256-derived ``payload`` column so each + row is several hundred bytes. + + Data layout mirrors QA: + - 10M rows distributed deterministically across 50 monthly partitions + (200k rows per partition, partition IDs ``201501``..``201902``). + - Insert uses ``max_insert_threads = 8`` with no ``max_insert_block_size`` + override, so each partition typically ends up with multiple parts + (instead of exactly one part per partition as in the prior version of + this test, which made the bug invisible). + + Export pattern mirrors QA: + - 50 separate ``ALTER TABLE ... EXPORT PARTITION ID ''`` + statements, NOT one ALTER bundling 50 clauses. + """ + replica1 = cluster.instances["replica1"] + replica2 = cluster.instances["replica2"] + + uid = unique_suffix() + mt_table = f"mt_qa_{uid}" + iceberg_table = f"iceberg_qa_{uid}" + + # Source RMT schema (with MATERIALIZED partition column, matching QA). + rmt_cols = ( + "event_month Int32 MATERIALIZED toInt32(toYYYYMM(toDateTime(event_time))), " + "id Int64, " + "event_time DateTime64(6), " + "user_id Int32, " + "category String, " + "value Float64, " + "payload String" + ) + # Iceberg destination has the same columns but no MATERIALIZED expression. + iceberg_cols = ( + "event_month Int32, " + "id Int64, " + "event_time DateTime64(6), " + "user_id Int32, " + "category String, " + "value Float64, " + "payload String" + ) + + make_rmt(replica1, mt_table, rmt_cols, "event_month", + replica_name="replica1", order_by="(event_time, id)") + make_rmt(replica2, mt_table, rmt_cols, "event_month", + replica_name="replica2", order_by="(event_time, id)") + + make_iceberg_s3(replica1, iceberg_table, iceberg_cols, "event_month") + make_iceberg_s3(replica2, iceberg_table, iceberg_cols, "event_month", + if_not_exists=True) + + num_partitions = 50 + rows_per_partition = 200_000 + total_rows = num_partitions * rows_per_partition # 10,000,000 + + # Insert all rows on replica1 only. Each row's event_time selects which + # monthly partition it lands in (rows 0..199999 -> 2015-01, etc.). No + # max_insert_block_size override is set, so the insert is split into + # multiple parts per partition under max_insert_threads = 8. + replica1.query( + f""" + INSERT INTO {mt_table} (id, event_time, user_id, category, value, payload) + SELECT + n AS id, + toDateTime64( + toStartOfMonth(toDateTime('2015-01-01 00:00:00')) + + toIntervalMonth(intDiv(n, {rows_per_partition})) + + toIntervalSecond(n % {rows_per_partition}), + 6 + ) AS event_time, + toInt32((n * 48271) % 2147483647) AS user_id, + concat('c', toString(n % 10)) AS category, + (n % 1000000) / 1000. AS value, + lower(hex(SHA256(toString(n)))) AS payload + FROM (SELECT number AS n FROM numbers({total_rows})) + SETTINGS max_insert_threads = 8 + """ + ) + + inserted = int(replica1.query(f"SELECT count() FROM {mt_table}").strip()) + assert inserted == total_rows, ( + f"Expected {total_rows} rows in source MergeTree, got {inserted}" + ) + + # Make sure the second replica has the data before issuing exports. + replica2.query(f"SYSTEM SYNC REPLICA {mt_table}", timeout=600) + + # Build the 50 monthly partition IDs starting from 201501. + partition_ids = [ + f"{2015 + i // 12}{(i % 12) + 1:02d}" for i in range(num_partitions) + ] + + # Issue 50 SEPARATE ALTER TABLE statements (mirroring QA's reproduction) + # concatenated into one semicolon-separated script and sent in a single + # query call. Bundling them into a single ALTER with comma-separated + # clauses hides the issue -- the bug only shows up reliably when each + # partition is its own statement. + export_script = "\n".join( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '{pid}' TO TABLE {iceberg_table};" + for pid in partition_ids + ) + replica1.query(export_script) + + for pid in partition_ids: + wait_for_export_status( + replica1, mt_table, iceberg_table, pid, + expected_status="COMPLETED", timeout=600, + ) + + count = int(replica1.query(f"SELECT count() FROM {iceberg_table}").strip()) + assert count == total_rows, ( + f"Expected {total_rows} rows in Iceberg table after export, got {count}" + ) + + # Per-partition row count must also match (200k rows each). Mismatches at + # this granularity are what QA actually observed. + mismatches = [] + for pid in partition_ids: + per_part = int(replica1.query( + f"SELECT count() FROM {iceberg_table} WHERE event_month = {int(pid)}" + ).strip()) + if per_part != rows_per_partition: + mismatches.append((pid, per_part)) + assert not mismatches, ( + f"Per-partition row counts do not match (expected {rows_per_partition} " + f"rows per partition): {mismatches}" + )