From 3cd2db50d28b2e1306dd1449f31182fdb1e28b35 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Thu, 21 May 2026 00:50:00 +0530 Subject: [PATCH 1/4] Spark 4.1: Expose SortOrderToSpark.convert helper for scan ordering use --- .../java/org/apache/iceberg/spark/SortOrderToSpark.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SortOrderToSpark.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SortOrderToSpark.java index 781f61b33f0e..82ad0eafd27c 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SortOrderToSpark.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SortOrderToSpark.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.spark; +import java.util.List; import java.util.Map; import org.apache.iceberg.NullOrder; import org.apache.iceberg.Schema; @@ -27,7 +28,7 @@ import org.apache.spark.sql.connector.expressions.NullOrdering; import org.apache.spark.sql.connector.expressions.SortOrder; -class SortOrderToSpark implements SortOrderVisitor { +public class SortOrderToSpark implements SortOrderVisitor { private final Map quotedNameById; @@ -35,6 +36,11 @@ class SortOrderToSpark implements SortOrderVisitor { this.quotedNameById = SparkSchemaUtil.indexQuotedNameById(schema); } + public static SortOrder[] convert(org.apache.iceberg.SortOrder order, Schema schema) { + List converted = SortOrderVisitor.visit(order, new SortOrderToSpark(schema)); + return converted.toArray(new SortOrder[0]); + } + @Override public SortOrder field(String sourceName, int id, SortDirection direction, NullOrder nullOrder) { return Expressions.sort( From 1926752a0f1eb58c790e986f29fc1d29f7badb63 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Thu, 21 May 2026 00:50:13 +0530 Subject: [PATCH 2/4] Spark 4.1: Implement SupportsReportOrdering in SparkPartitioningAwareScan (#16430) --- .../source/SparkPartitioningAwareScan.java | 64 ++++++++++++++++++- 1 file changed, 63 insertions(+), 1 deletion(-) diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java index fe5eeee8fb10..73ac37b86e83 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java @@ -27,6 +27,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionScanTask; import org.apache.iceberg.PartitionSpec; @@ -41,14 +42,18 @@ import org.apache.iceberg.metrics.ScanReport; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.SortOrderToSpark; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkReadConf; +import org.apache.iceberg.transforms.Transforms; import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.types.Types.StructType; import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.expressions.SortOrder; import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.connector.read.SupportsReportOrdering; import org.apache.spark.sql.connector.read.SupportsReportPartitioning; import org.apache.spark.sql.connector.read.partitioning.KeyGroupedPartitioning; import org.apache.spark.sql.connector.read.partitioning.Partitioning; @@ -57,7 +62,7 @@ import org.slf4j.LoggerFactory; abstract class SparkPartitioningAwareScan extends SparkScan - implements SupportsReportPartitioning { + implements SupportsReportPartitioning, SupportsReportOrdering { private static final Logger LOG = LoggerFactory.getLogger(SparkPartitioningAwareScan.class); @@ -123,6 +128,63 @@ public Partitioning outputPartitioning() { } } + @Override + public SortOrder[] outputOrdering() { + List planned = tasks(); + if (planned.isEmpty()) { + return new SortOrder[0]; + } + + Integer uniformId = uniformSortOrderId(planned); + if (uniformId == null || uniformId == org.apache.iceberg.SortOrder.unsorted().orderId()) { + return new SortOrder[0]; + } + + org.apache.iceberg.SortOrder icebergOrder = table().sortOrders().get(uniformId); + if (icebergOrder == null || icebergOrder.isUnsorted()) { + return new SortOrder[0]; + } + + if (!allTaskGroupsAreSingleFile()) { + return new SortOrder[0]; + } + + if (!allFieldsAreIdentity(icebergOrder)) { + return new SortOrder[0]; + } + + return SortOrderToSpark.convert(icebergOrder, table().schema()); + } + + private Integer uniformSortOrderId(List planned) { + Integer id = null; + for (T task : planned) { + if (!(task instanceof FileScanTask)) { + return null; + } + int taskId = ((FileScanTask) task).file().sortOrderId(); + if (id == null) { + id = taskId; + } else if (id != taskId) { + return null; + } + } + return id; + } + + private boolean allTaskGroupsAreSingleFile() { + for (ScanTaskGroup group : taskGroups()) { + if (group.tasks().size() != 1) { + return false; + } + } + return true; + } + + private static boolean allFieldsAreIdentity(org.apache.iceberg.SortOrder order) { + return order.fields().stream().allMatch(f -> f.transform().equals(Transforms.identity())); + } + @Override protected StructType groupingKeyType() { if (groupingKeyType == null) { From e694f86ad90bb79666c2dce3fda378cb37c91b00 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Thu, 21 May 2026 00:50:20 +0530 Subject: [PATCH 3/4] Spark 4.1: Add invariant unit tests for scan outputOrdering --- ...TestSparkScanOutputOrderingInvariants.java | 186 ++++++++++++++++++ 1 file changed, 186 insertions(+) create mode 100644 spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScanOutputOrderingInvariants.java diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScanOutputOrderingInvariants.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScanOutputOrderingInvariants.java new file mode 100644 index 000000000000..2aa653296623 --- /dev/null +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScanOutputOrderingInvariants.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Collections; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.Table; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.TestBaseWithCatalog; +import org.apache.spark.sql.connector.expressions.SortOrder; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * Unit-style tests for the invariants of {@code SparkPartitioningAwareScan.outputOrdering()}. Each + * test creates the minimum table state needed to exercise exactly one invariant and asserts on the + * returned {@link SortOrder}[] directly. + * + *

End-to-end physical plan validation lives in {@link TestSparkScan}. + * + *

Tracks https://github.com/apache/iceberg/issues/16430. + */ +@ExtendWith(ParameterizedTestExtension.class) +public class TestSparkScanOutputOrderingInvariants extends TestBaseWithCatalog { + + @Parameter(index = 3) + private String format; + + @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}, format = {3}") + public static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.HADOOP.catalogName(), + SparkCatalogConfig.HADOOP.implementation(), + SparkCatalogConfig.HADOOP.properties(), + "parquet" + } + }; + } + + @BeforeEach + public void useCatalog() { + sql("USE %s", catalogName); + } + + @AfterEach + public void dropTable() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @TestTemplate + public void uniformSortOrderIdProducesNonEmptyOrdering() { + sql( + "CREATE TABLE %s (user_id BIGINT, event_time TIMESTAMP) USING iceberg" + + " TBLPROPERTIES ('write.format.default'='%s')", + tableName, format); + Table table = validationCatalog.loadTable(tableIdent); + table.replaceSortOrder().asc("event_time").commit(); + + sql("INSERT INTO %s VALUES (1, TIMESTAMP '2024-01-01 00:00:00')", tableName); + sql("INSERT INTO %s VALUES (2, TIMESTAMP '2024-01-02 00:00:00')", tableName); + + assertThat(outputOrderingOf(table)).as("uniform sort_order_id").hasSize(1); + } + + @TestTemplate + public void mixedSortOrderIdsProduceEmptyOrdering() { + sql( + "CREATE TABLE %s (user_id BIGINT, event_time TIMESTAMP) USING iceberg" + + " TBLPROPERTIES ('write.format.default'='%s')", + tableName, format); + + Table table = validationCatalog.loadTable(tableIdent); + table.replaceSortOrder().asc("event_time").commit(); + sql("INSERT INTO %s VALUES (1, TIMESTAMP '2024-01-01 00:00:00')", tableName); + + table.refresh(); + table.replaceSortOrder().asc("user_id").commit(); + sql("INSERT INTO %s VALUES (2, TIMESTAMP '2024-01-02 00:00:00')", tableName); + table.refresh(); + + assertThat(outputOrderingOf(table)).as("mixed sort_order_id -> []").isEmpty(); + } + + @TestTemplate + public void unsortedTableProducesEmptyOrdering() { + sql( + "CREATE TABLE %s (user_id BIGINT, event_time TIMESTAMP) USING iceberg" + + " TBLPROPERTIES ('write.format.default'='%s')", + tableName, format); + sql("INSERT INTO %s VALUES (1, TIMESTAMP '2024-01-01 00:00:00')", tableName); + + assertThat(outputOrderingOf(validationCatalog.loadTable(tableIdent))) + .as("sort_order_id = 0 -> []") + .isEmpty(); + } + + @TestTemplate + public void bucketTransformSortKeyProducesEmptyOrdering() { + sql( + "CREATE TABLE %s (user_id BIGINT, event_time TIMESTAMP) USING iceberg" + + " TBLPROPERTIES ('write.format.default'='%s')", + tableName, format); + Table table = validationCatalog.loadTable(tableIdent); + table + .replaceSortOrder() + .asc(org.apache.iceberg.expressions.Expressions.bucket("user_id", 8)) + .commit(); + sql("INSERT INTO %s VALUES (1, TIMESTAMP '2024-01-01 00:00:00')", tableName); + + assertThat(outputOrderingOf(table)).as("bucket transform -> []").isEmpty(); + } + + @TestTemplate + public void multiFileTaskGroupProducesEmptyOrdering() { + sql( + "CREATE TABLE %s (user_id BIGINT, event_time TIMESTAMP) USING iceberg" + + " TBLPROPERTIES ('write.format.default'='%s')", + tableName, format); + Table table = validationCatalog.loadTable(tableIdent); + table.replaceSortOrder().asc("event_time").commit(); + + sql("INSERT INTO %s VALUES (1, TIMESTAMP '2024-01-01 00:00:00')", tableName); + sql("INSERT INTO %s VALUES (2, TIMESTAMP '2024-01-02 00:00:00')", tableName); + + // Default split size = 128 MB packs tiny test files into one task group + SortOrder[] ordering = + ((org.apache.spark.sql.connector.read.SupportsReportOrdering) + new SparkScanBuilder(spark, table, CaseInsensitiveStringMap.empty()).build()) + .outputOrdering(); + assertThat(ordering).as("multi-file task group -> []").isEmpty(); + } + + @TestTemplate + public void compositeIdentitySortKeyProducesAllFields() { + sql( + "CREATE TABLE %s (region STRING, user_id BIGINT, event_time TIMESTAMP) USING iceberg" + + " TBLPROPERTIES ('write.format.default'='%s')", + tableName, format); + Table table = validationCatalog.loadTable(tableIdent); + table.replaceSortOrder().asc("region").asc("user_id").commit(); + + sql( + "INSERT INTO %s VALUES ('us', 1, TIMESTAMP '2024-01-01 00:00:00')," + + " ('us', 2, TIMESTAMP '2024-01-02 00:00:00')", + tableName); + + assertThat(outputOrderingOf(table)).as("composite identity sort").hasSize(2); + } + + /** + * Builds a scan with split size = 1 byte so each file becomes its own task group, then returns + * the scan's reported output ordering. + */ + private SortOrder[] outputOrderingOf(Table table) { + CaseInsensitiveStringMap opts = + new CaseInsensitiveStringMap(Collections.singletonMap(SparkReadOptions.SPLIT_SIZE, "1")); + org.apache.spark.sql.connector.read.Scan scan = + new SparkScanBuilder(spark, table, opts).build(); + return ((org.apache.spark.sql.connector.read.SupportsReportOrdering) scan).outputOrdering(); + } +} From 375d04745a93ebaa95eb4f6020e94b4d56ef3346 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Thu, 21 May 2026 00:50:26 +0530 Subject: [PATCH 4/4] Spark 4.1: Add end-to-end plan tests for scan outputOrdering --- .../iceberg/spark/source/TestSparkScan.java | 172 ++++++++++++++++++ 1 file changed, 172 insertions(+) diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java index fd133de4baa8..ad4a9adffa3f 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java @@ -1170,4 +1170,176 @@ private static NamedReference fieldRef(String col) { private static UserDefinedScalarFunc toUDF(BoundFunction function, Expression[] expressions) { return new UserDefinedScalarFunc(function.name(), function.canonicalName(), expressions); } + + // --------------------------------------------------------------------------- + // SupportsReportOrdering — tests that sort_order_id is surfaced as + // outputOrdering so Spark can skip redundant sorts above sorted scans. + // Tracks https://github.com/apache/iceberg/issues/16430. + // --------------------------------------------------------------------------- + + @TestTemplate + public void testOutputOrderingForSingleAscSortKey() { + sql( + "CREATE TABLE %s (user_id BIGINT, event_time TIMESTAMP) USING iceberg" + + " TBLPROPERTIES ('%s'='%s', 'read.split.target-size'='1'," + + " 'read.split.open-file-cost'='1')", + tableName, TableProperties.DEFAULT_FILE_FORMAT, format); + Table table = validationCatalog.loadTable(tableIdent); + table.replaceSortOrder().asc("event_time").commit(); + + sql( + "INSERT INTO %s VALUES (1, TIMESTAMP '2024-01-01 00:00:00')," + + " (2, TIMESTAMP '2024-01-01 01:00:00')", + tableName); + sql( + "INSERT INTO %s VALUES (3, TIMESTAMP '2024-01-02 00:00:00')," + + " (4, TIMESTAMP '2024-01-02 01:00:00')", + tableName); + + String plan = explainPlan("SELECT * FROM %s ORDER BY event_time LIMIT 100", tableName); + assertThat(plan) + .as("Sort eliminated when scan advertises outputOrdering for ASC sort key") + .doesNotContain("Sort ["); + } + + @TestTemplate + public void testOutputOrderingForDescSortKey() { + sql( + "CREATE TABLE %s (user_id BIGINT, event_time TIMESTAMP) USING iceberg" + + " TBLPROPERTIES ('%s'='%s', 'read.split.target-size'='1'," + + " 'read.split.open-file-cost'='1')", + tableName, TableProperties.DEFAULT_FILE_FORMAT, format); + validationCatalog.loadTable(tableIdent).replaceSortOrder().desc("event_time").commit(); + + sql( + "INSERT INTO %s VALUES (1, TIMESTAMP '2024-01-02 00:00:00')," + + " (2, TIMESTAMP '2024-01-01 00:00:00')", + tableName); + + String plan = explainPlan("SELECT * FROM %s ORDER BY event_time DESC LIMIT 100", tableName); + assertThat(plan).as("Sort eliminated for DESC sort key").doesNotContain("Sort ["); + } + + @TestTemplate + public void testOutputOrderingForCompositeSortKey() { + sql( + "CREATE TABLE %s (region STRING, user_id BIGINT, event_time TIMESTAMP) USING iceberg" + + " TBLPROPERTIES ('%s'='%s', 'read.split.target-size'='1'," + + " 'read.split.open-file-cost'='1')", + tableName, TableProperties.DEFAULT_FILE_FORMAT, format); + validationCatalog + .loadTable(tableIdent) + .replaceSortOrder() + .asc("region") + .asc("user_id") + .commit(); + + sql( + "INSERT INTO %s VALUES ('us', 1, TIMESTAMP '2024-01-01 00:00:00')," + + " ('us', 2, TIMESTAMP '2024-01-02 00:00:00')," + + " ('eu', 3, TIMESTAMP '2024-01-03 00:00:00')", + tableName); + + String plan = explainPlan("SELECT * FROM %s ORDER BY region, user_id LIMIT 100", tableName); + assertThat(plan).as("Sort eliminated for composite sort key").doesNotContain("Sort ["); + } + + @TestTemplate + public void testNoOutputOrderingForUnsortedTable() { + sql( + "CREATE TABLE %s (user_id BIGINT, event_time TIMESTAMP) USING iceberg" + + " TBLPROPERTIES ('%s'='%s')", + tableName, TableProperties.DEFAULT_FILE_FORMAT, format); + + sql( + "INSERT INTO %s VALUES (1, TIMESTAMP '2024-01-01 00:00:00')," + + " (2, TIMESTAMP '2024-01-02 00:00:00')", + tableName); + + // Window function forces an explicit local Sort that outputOrdering would otherwise satisfy + String plan = + explainPlan( + "SELECT user_id, event_time," + + " ROW_NUMBER() OVER (ORDER BY event_time) AS rn FROM %s", + tableName); + assertThat(plan).as("Sort required for unsorted table").contains("Sort ["); + } + + @TestTemplate + public void testNoOutputOrderingForMixedSortOrderIds() { + sql( + "CREATE TABLE %s (user_id BIGINT, event_time TIMESTAMP) USING iceberg" + + " TBLPROPERTIES ('%s'='%s', 'read.split.target-size'='1'," + + " 'read.split.open-file-cost'='1')", + tableName, TableProperties.DEFAULT_FILE_FORMAT, format); + + Table table = validationCatalog.loadTable(tableIdent); + table.replaceSortOrder().asc("event_time").commit(); + sql("INSERT INTO %s VALUES (1, TIMESTAMP '2024-01-01 00:00:00')", tableName); + + table.refresh(); + table.replaceSortOrder().asc("user_id").commit(); + sql("INSERT INTO %s VALUES (2, TIMESTAMP '2024-01-02 00:00:00')", tableName); + + String plan = + explainPlan( + "SELECT user_id, event_time," + " ROW_NUMBER() OVER (ORDER BY user_id) AS rn FROM %s", + tableName); + assertThat(plan).as("Sort required when sort_order_id differs across files").contains("Sort ["); + } + + @TestTemplate + public void testNoOutputOrderingForBucketTransform() { + sql( + "CREATE TABLE %s (user_id BIGINT, event_time TIMESTAMP) USING iceberg" + + " TBLPROPERTIES ('%s'='%s', 'read.split.target-size'='1'," + + " 'read.split.open-file-cost'='1')", + tableName, TableProperties.DEFAULT_FILE_FORMAT, format); + validationCatalog + .loadTable(tableIdent) + .replaceSortOrder() + .asc(org.apache.iceberg.expressions.Expressions.bucket("user_id", 8)) + .commit(); + + sql( + "INSERT INTO %s VALUES (1, TIMESTAMP '2024-01-01 00:00:00')," + + " (2, TIMESTAMP '2024-01-02 00:00:00')", + tableName); + + String plan = + explainPlan( + "SELECT user_id, event_time," + " ROW_NUMBER() OVER (ORDER BY user_id) AS rn FROM %s", + tableName); + assertThat(plan).as("Sort required: bucket transform is hash, not range").contains("Sort ["); + } + + @TestTemplate + public void testNoOutputOrderingForMultiFileTaskGroup() { + // Default split size — small files get bin-packed into multi-file task groups + sql( + "CREATE TABLE %s (user_id BIGINT, event_time TIMESTAMP) USING iceberg" + + " TBLPROPERTIES ('%s'='%s')", + tableName, TableProperties.DEFAULT_FILE_FORMAT, format); + validationCatalog.loadTable(tableIdent).replaceSortOrder().asc("event_time").commit(); + + sql("INSERT INTO %s VALUES (1, TIMESTAMP '2024-01-01 00:00:00')", tableName); + sql("INSERT INTO %s VALUES (2, TIMESTAMP '2024-01-02 00:00:00')", tableName); + + String plan = + explainPlan( + "SELECT user_id, event_time," + + " ROW_NUMBER() OVER (ORDER BY event_time) AS rn FROM %s", + tableName); + assertThat(plan) + .as("Sort required: multi-file task group does not preserve per-file ordering") + .contains("Sort ["); + } + + private String explainPlan(String sqlTemplate, Object... args) { + return spark + .sql("EXPLAIN " + String.format(sqlTemplate, args)) + .collectAsList() + .get(0) + .getString(0); + } }