From f4ece4e32e6b10ae02dc1199ee6d039fd8a3fe26 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Mon, 11 May 2026 16:46:48 -0300 Subject: [PATCH 1/2] add a test that tries to replicate the behavior julian is seeing --- .../test.py | 74 +++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py b/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py index af6ca75bafd7..9269612b0543 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py @@ -932,3 +932,77 @@ def test_export_partition_writes_column_statistics(cluster): entries = fetch_manifest_entries(node, query_id) assert_exported_stats(entries) + + +def test_export_50_partitions_in_single_alter_to_iceberg(cluster): + """ + Single-statement export of 50 partitions to IcebergS3 (no catalog). + + Creates a 5-column ReplicatedMergeTree table on two replicas, inserts 10M + rows into the first replica laid out as 50 partitions of 200k rows each, + then issues a single ALTER TABLE that bundles 50 EXPORT PARTITION clauses + pointing at the same IcebergS3 destination. Waits for every partition to + reach COMPLETED in system.replicated_partition_exports and asserts the + total row count round-trips end-to-end. + """ + replica1 = cluster.instances["replica1"] + replica2 = cluster.instances["replica2"] + + uid = unique_suffix() + mt_table = f"mt_many_{uid}" + iceberg_table = f"iceberg_many_{uid}" + + cols = "id Int64, name String, value Float64, event_dt Date, part_id Int32" + + make_rmt(replica1, mt_table, cols, "part_id", + replica_name="replica1", order_by="id") + make_rmt(replica2, mt_table, cols, "part_id", + replica_name="replica2", order_by="id") + + make_iceberg_s3(replica1, iceberg_table, cols, "part_id") + make_iceberg_s3(replica2, iceberg_table, cols, "part_id", + if_not_exists=True) + + num_partitions = 50 + rows_per_partition = 200_000 + total_rows = num_partitions * rows_per_partition + + # Insert all rows on the first replica only. max_insert_block_size is set to + # the full row count so the writer accumulates one block before splitting it + # by partition, producing exactly one part per partition_id. + replica1.query( + f""" + INSERT INTO {mt_table} + SELECT + number AS id, + toString(number) AS name, + toFloat64(number) * 1.5 AS value, + toDate('2024-01-01') + (number % 365) AS event_dt, + toInt32(number % {num_partitions}) AS part_id + FROM numbers({total_rows}) + SETTINGS max_insert_block_size = {total_rows}, + min_insert_block_size_rows = {total_rows} + """ + ) + + inserted = int(replica1.query(f"SELECT count() FROM {mt_table}").strip()) + assert inserted == total_rows, ( + f"Expected {total_rows} rows in source MergeTree, got {inserted}" + ) + + export_clauses = ", ".join( + f"EXPORT PARTITION ID '{p}' TO TABLE {iceberg_table}" + for p in range(num_partitions) + ) + replica1.query(f"ALTER TABLE {mt_table} {export_clauses}") + + for p in range(num_partitions): + wait_for_export_status( + replica1, mt_table, iceberg_table, str(p), + expected_status="COMPLETED", timeout=600, + ) + + count = int(replica1.query(f"SELECT count() FROM {iceberg_table}").strip()) + assert count == total_rows, ( + f"Expected {total_rows} rows in Iceberg table after export, got {count}" + ) From 2620bed87385b77385e4a89e4ec4023ec3b7464b Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Mon, 11 May 2026 17:32:11 -0300 Subject: [PATCH 2/2] try to match qa --- .../test.py | 142 +++++++++++++----- 1 file changed, 104 insertions(+), 38 deletions(-) diff --git a/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py b/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py index 9269612b0543..67ff3f802001 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py @@ -934,54 +934,93 @@ def test_export_partition_writes_column_statistics(cluster): assert_exported_stats(entries) -def test_export_50_partitions_in_single_alter_to_iceberg(cluster): +def test_export_50_monthly_partitions_to_iceberg_two_replicas(cluster): """ - Single-statement export of 50 partitions to IcebergS3 (no catalog). - - Creates a 5-column ReplicatedMergeTree table on two replicas, inserts 10M - rows into the first replica laid out as 50 partitions of 200k rows each, - then issues a single ALTER TABLE that bundles 50 EXPORT PARTITION clauses - pointing at the same IcebergS3 destination. Waits for every partition to - reach COMPLETED in system.replicated_partition_exports and asserts the - total row count round-trips end-to-end. + Reproduction of the QA scenario reported against EXPORT PARTITION on a + two-replica ReplicatedMergeTree. Each partition is exported with its own + ALTER TABLE statement, which mirrors how QA drove the failure (every + statement reports success but the destination row count does not match). + + Schema mirrors QA: + - ``event_month`` is a MATERIALIZED Int32 derived from ``event_time``, + and is the partition expression. + - Wide row layout with a SHA256-derived ``payload`` column so each + row is several hundred bytes. + + Data layout mirrors QA: + - 10M rows distributed deterministically across 50 monthly partitions + (200k rows per partition, partition IDs ``201501``..``201902``). + - Insert uses ``max_insert_threads = 8`` with no ``max_insert_block_size`` + override, so each partition typically ends up with multiple parts + (instead of exactly one part per partition as in the prior version of + this test, which made the bug invisible). + + Export pattern mirrors QA: + - 50 separate ``ALTER TABLE ... EXPORT PARTITION ID ''`` + statements, NOT one ALTER bundling 50 clauses. """ replica1 = cluster.instances["replica1"] replica2 = cluster.instances["replica2"] uid = unique_suffix() - mt_table = f"mt_many_{uid}" - iceberg_table = f"iceberg_many_{uid}" - - cols = "id Int64, name String, value Float64, event_dt Date, part_id Int32" + mt_table = f"mt_qa_{uid}" + iceberg_table = f"iceberg_qa_{uid}" + + # Source RMT schema (with MATERIALIZED partition column, matching QA). + rmt_cols = ( + "event_month Int32 MATERIALIZED toInt32(toYYYYMM(toDateTime(event_time))), " + "id Int64, " + "event_time DateTime64(6), " + "user_id Int32, " + "category String, " + "value Float64, " + "payload String" + ) + # Iceberg destination has the same columns but no MATERIALIZED expression. + iceberg_cols = ( + "event_month Int32, " + "id Int64, " + "event_time DateTime64(6), " + "user_id Int32, " + "category String, " + "value Float64, " + "payload String" + ) - make_rmt(replica1, mt_table, cols, "part_id", - replica_name="replica1", order_by="id") - make_rmt(replica2, mt_table, cols, "part_id", - replica_name="replica2", order_by="id") + make_rmt(replica1, mt_table, rmt_cols, "event_month", + replica_name="replica1", order_by="(event_time, id)") + make_rmt(replica2, mt_table, rmt_cols, "event_month", + replica_name="replica2", order_by="(event_time, id)") - make_iceberg_s3(replica1, iceberg_table, cols, "part_id") - make_iceberg_s3(replica2, iceberg_table, cols, "part_id", + make_iceberg_s3(replica1, iceberg_table, iceberg_cols, "event_month") + make_iceberg_s3(replica2, iceberg_table, iceberg_cols, "event_month", if_not_exists=True) num_partitions = 50 rows_per_partition = 200_000 - total_rows = num_partitions * rows_per_partition + total_rows = num_partitions * rows_per_partition # 10,000,000 - # Insert all rows on the first replica only. max_insert_block_size is set to - # the full row count so the writer accumulates one block before splitting it - # by partition, producing exactly one part per partition_id. + # Insert all rows on replica1 only. Each row's event_time selects which + # monthly partition it lands in (rows 0..199999 -> 2015-01, etc.). No + # max_insert_block_size override is set, so the insert is split into + # multiple parts per partition under max_insert_threads = 8. replica1.query( f""" - INSERT INTO {mt_table} + INSERT INTO {mt_table} (id, event_time, user_id, category, value, payload) SELECT - number AS id, - toString(number) AS name, - toFloat64(number) * 1.5 AS value, - toDate('2024-01-01') + (number % 365) AS event_dt, - toInt32(number % {num_partitions}) AS part_id - FROM numbers({total_rows}) - SETTINGS max_insert_block_size = {total_rows}, - min_insert_block_size_rows = {total_rows} + n AS id, + toDateTime64( + toStartOfMonth(toDateTime('2015-01-01 00:00:00')) + + toIntervalMonth(intDiv(n, {rows_per_partition})) + + toIntervalSecond(n % {rows_per_partition}), + 6 + ) AS event_time, + toInt32((n * 48271) % 2147483647) AS user_id, + concat('c', toString(n % 10)) AS category, + (n % 1000000) / 1000. AS value, + lower(hex(SHA256(toString(n)))) AS payload + FROM (SELECT number AS n FROM numbers({total_rows})) + SETTINGS max_insert_threads = 8 """ ) @@ -990,15 +1029,28 @@ def test_export_50_partitions_in_single_alter_to_iceberg(cluster): f"Expected {total_rows} rows in source MergeTree, got {inserted}" ) - export_clauses = ", ".join( - f"EXPORT PARTITION ID '{p}' TO TABLE {iceberg_table}" - for p in range(num_partitions) + # Make sure the second replica has the data before issuing exports. + replica2.query(f"SYSTEM SYNC REPLICA {mt_table}", timeout=600) + + # Build the 50 monthly partition IDs starting from 201501. + partition_ids = [ + f"{2015 + i // 12}{(i % 12) + 1:02d}" for i in range(num_partitions) + ] + + # Issue 50 SEPARATE ALTER TABLE statements (mirroring QA's reproduction) + # concatenated into one semicolon-separated script and sent in a single + # query call. Bundling them into a single ALTER with comma-separated + # clauses hides the issue -- the bug only shows up reliably when each + # partition is its own statement. + export_script = "\n".join( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '{pid}' TO TABLE {iceberg_table};" + for pid in partition_ids ) - replica1.query(f"ALTER TABLE {mt_table} {export_clauses}") + replica1.query(export_script) - for p in range(num_partitions): + for pid in partition_ids: wait_for_export_status( - replica1, mt_table, iceberg_table, str(p), + replica1, mt_table, iceberg_table, pid, expected_status="COMPLETED", timeout=600, ) @@ -1006,3 +1058,17 @@ def test_export_50_partitions_in_single_alter_to_iceberg(cluster): assert count == total_rows, ( f"Expected {total_rows} rows in Iceberg table after export, got {count}" ) + + # Per-partition row count must also match (200k rows each). Mismatches at + # this granularity are what QA actually observed. + mismatches = [] + for pid in partition_ids: + per_part = int(replica1.query( + f"SELECT count() FROM {iceberg_table} WHERE event_month = {int(pid)}" + ).strip()) + if per_part != rows_per_partition: + mismatches.append((pid, per_part)) + assert not mismatches, ( + f"Per-partition row counts do not match (expected {rows_per_partition} " + f"rows per partition): {mismatches}" + )