Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.spark;

import java.util.List;
import java.util.Map;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.Schema;
Expand All @@ -27,14 +28,19 @@
import org.apache.spark.sql.connector.expressions.NullOrdering;
import org.apache.spark.sql.connector.expressions.SortOrder;

class SortOrderToSpark implements SortOrderVisitor<SortOrder> {
public class SortOrderToSpark implements SortOrderVisitor<SortOrder> {

private final Map<Integer, String> quotedNameById;

SortOrderToSpark(Schema schema) {
this.quotedNameById = SparkSchemaUtil.indexQuotedNameById(schema);
}

public static SortOrder[] convert(org.apache.iceberg.SortOrder order, Schema schema) {
List<SortOrder> converted = SortOrderVisitor.visit(order, new SortOrderToSpark(schema));
return converted.toArray(new SortOrder[0]);
}

@Override
public SortOrder field(String sourceName, int id, SortDirection direction, NullOrder nullOrder) {
return Expressions.sort(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionScanTask;
import org.apache.iceberg.PartitionSpec;
Expand All @@ -41,14 +42,18 @@
import org.apache.iceberg.metrics.ScanReport;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.SortOrderToSpark;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.expressions.SortOrder;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.connector.read.SupportsReportOrdering;
import org.apache.spark.sql.connector.read.SupportsReportPartitioning;
import org.apache.spark.sql.connector.read.partitioning.KeyGroupedPartitioning;
import org.apache.spark.sql.connector.read.partitioning.Partitioning;
Expand All @@ -57,7 +62,7 @@
import org.slf4j.LoggerFactory;

abstract class SparkPartitioningAwareScan<T extends PartitionScanTask> extends SparkScan
implements SupportsReportPartitioning {
implements SupportsReportPartitioning, SupportsReportOrdering {

private static final Logger LOG = LoggerFactory.getLogger(SparkPartitioningAwareScan.class);

Expand Down Expand Up @@ -123,6 +128,63 @@ public Partitioning outputPartitioning() {
}
}

@Override
public SortOrder[] outputOrdering() {
List<T> planned = tasks();
if (planned.isEmpty()) {

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any uncertainty about ordering → empty result → identical to today

return new SortOrder[0];
}

Integer uniformId = uniformSortOrderId(planned);
if (uniformId == null || uniformId == org.apache.iceberg.SortOrder.unsorted().orderId()) {
return new SortOrder[0];
}

org.apache.iceberg.SortOrder icebergOrder = table().sortOrders().get(uniformId);
if (icebergOrder == null || icebergOrder.isUnsorted()) {
return new SortOrder[0];
}

if (!allTaskGroupsAreSingleFile()) {
return new SortOrder[0];
}

if (!allFieldsAreIdentity(icebergOrder)) {
return new SortOrder[0];
}

return SortOrderToSpark.convert(icebergOrder, table().schema());
}

private Integer uniformSortOrderId(List<T> planned) {
Integer id = null;
for (T task : planned) {
if (!(task instanceof FileScanTask)) {
return null;
}
int taskId = ((FileScanTask) task).file().sortOrderId();
if (id == null) {
id = taskId;
} else if (id != taskId) {
return null;
}
}
return id;
}

private boolean allTaskGroupsAreSingleFile() {
for (ScanTaskGroup<T> group : taskGroups()) {
if (group.tasks().size() != 1) {
return false;
}
}
return true;
}

private static boolean allFieldsAreIdentity(org.apache.iceberg.SortOrder order) {
return order.fields().stream().allMatch(f -> f.transform().equals(Transforms.identity()));
}

@Override
protected StructType groupingKeyType() {
if (groupingKeyType == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1170,4 +1170,176 @@ private static NamedReference fieldRef(String col) {
private static UserDefinedScalarFunc toUDF(BoundFunction function, Expression[] expressions) {
return new UserDefinedScalarFunc(function.name(), function.canonicalName(), expressions);
}

// ---------------------------------------------------------------------------
// SupportsReportOrdering — tests that sort_order_id is surfaced as
// outputOrdering so Spark can skip redundant sorts above sorted scans.
// Tracks https://github.com/apache/iceberg/issues/16430.
// ---------------------------------------------------------------------------

@TestTemplate
public void testOutputOrderingForSingleAscSortKey() {
sql(
"CREATE TABLE %s (user_id BIGINT, event_time TIMESTAMP) USING iceberg"
+ " TBLPROPERTIES ('%s'='%s', 'read.split.target-size'='1',"
+ " 'read.split.open-file-cost'='1')",
tableName, TableProperties.DEFAULT_FILE_FORMAT, format);
Table table = validationCatalog.loadTable(tableIdent);
table.replaceSortOrder().asc("event_time").commit();

sql(
"INSERT INTO %s VALUES (1, TIMESTAMP '2024-01-01 00:00:00'),"
+ " (2, TIMESTAMP '2024-01-01 01:00:00')",
tableName);
sql(
"INSERT INTO %s VALUES (3, TIMESTAMP '2024-01-02 00:00:00'),"
+ " (4, TIMESTAMP '2024-01-02 01:00:00')",
tableName);

String plan = explainPlan("SELECT * FROM %s ORDER BY event_time LIMIT 100", tableName);
assertThat(plan)
.as("Sort eliminated when scan advertises outputOrdering for ASC sort key")

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sort eliminated

.doesNotContain("Sort [");
}

@TestTemplate
public void testOutputOrderingForDescSortKey() {
sql(
"CREATE TABLE %s (user_id BIGINT, event_time TIMESTAMP) USING iceberg"
+ " TBLPROPERTIES ('%s'='%s', 'read.split.target-size'='1',"
+ " 'read.split.open-file-cost'='1')",
tableName, TableProperties.DEFAULT_FILE_FORMAT, format);
validationCatalog.loadTable(tableIdent).replaceSortOrder().desc("event_time").commit();

sql(
"INSERT INTO %s VALUES (1, TIMESTAMP '2024-01-02 00:00:00'),"
+ " (2, TIMESTAMP '2024-01-01 00:00:00')",
tableName);

String plan = explainPlan("SELECT * FROM %s ORDER BY event_time DESC LIMIT 100", tableName);
assertThat(plan).as("Sort eliminated for DESC sort key").doesNotContain("Sort [");

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+ TakeOrderedAndProject(limit=100, orderBy=[event_time DESC])
+   +- BatchScan ...                              Sort eliminated, DESC works

}

@TestTemplate
public void testOutputOrderingForCompositeSortKey() {
sql(
"CREATE TABLE %s (region STRING, user_id BIGINT, event_time TIMESTAMP) USING iceberg"
+ " TBLPROPERTIES ('%s'='%s', 'read.split.target-size'='1',"
+ " 'read.split.open-file-cost'='1')",
tableName, TableProperties.DEFAULT_FILE_FORMAT, format);
validationCatalog
.loadTable(tableIdent)
.replaceSortOrder()
.asc("region")
.asc("user_id")
.commit();

sql(
"INSERT INTO %s VALUES ('us', 1, TIMESTAMP '2024-01-01 00:00:00'),"
+ " ('us', 2, TIMESTAMP '2024-01-02 00:00:00'),"
+ " ('eu', 3, TIMESTAMP '2024-01-03 00:00:00')",
tableName);

String plan = explainPlan("SELECT * FROM %s ORDER BY region, user_id LIMIT 100", tableName);
assertThat(plan).as("Sort eliminated for composite sort key").doesNotContain("Sort [");
}

@TestTemplate
public void testNoOutputOrderingForUnsortedTable() {
sql(
"CREATE TABLE %s (user_id BIGINT, event_time TIMESTAMP) USING iceberg"
+ " TBLPROPERTIES ('%s'='%s')",
tableName, TableProperties.DEFAULT_FILE_FORMAT, format);

sql(
"INSERT INTO %s VALUES (1, TIMESTAMP '2024-01-01 00:00:00'),"
+ " (2, TIMESTAMP '2024-01-02 00:00:00')",
tableName);

// Window function forces an explicit local Sort that outputOrdering would otherwise satisfy
String plan =
explainPlan(
"SELECT user_id, event_time,"
+ " ROW_NUMBER() OVER (ORDER BY event_time) AS rn FROM %s",
tableName);
assertThat(plan).as("Sort required for unsorted table").contains("Sort [");
}

@TestTemplate
public void testNoOutputOrderingForMixedSortOrderIds() {
sql(
"CREATE TABLE %s (user_id BIGINT, event_time TIMESTAMP) USING iceberg"
+ " TBLPROPERTIES ('%s'='%s', 'read.split.target-size'='1',"
+ " 'read.split.open-file-cost'='1')",
tableName, TableProperties.DEFAULT_FILE_FORMAT, format);

Table table = validationCatalog.loadTable(tableIdent);
table.replaceSortOrder().asc("event_time").commit();
sql("INSERT INTO %s VALUES (1, TIMESTAMP '2024-01-01 00:00:00')", tableName);

table.refresh();
table.replaceSortOrder().asc("user_id").commit();
sql("INSERT INTO %s VALUES (2, TIMESTAMP '2024-01-02 00:00:00')", tableName);

String plan =
explainPlan(
"SELECT user_id, event_time," + " ROW_NUMBER() OVER (ORDER BY user_id) AS rn FROM %s",
tableName);
assertThat(plan).as("Sort required when sort_order_id differs across files").contains("Sort [");
}

@TestTemplate
public void testNoOutputOrderingForBucketTransform() {
sql(
"CREATE TABLE %s (user_id BIGINT, event_time TIMESTAMP) USING iceberg"
+ " TBLPROPERTIES ('%s'='%s', 'read.split.target-size'='1',"
+ " 'read.split.open-file-cost'='1')",
tableName, TableProperties.DEFAULT_FILE_FORMAT, format);
validationCatalog
.loadTable(tableIdent)
.replaceSortOrder()
.asc(org.apache.iceberg.expressions.Expressions.bucket("user_id", 8))
.commit();

sql(
"INSERT INTO %s VALUES (1, TIMESTAMP '2024-01-01 00:00:00'),"
+ " (2, TIMESTAMP '2024-01-02 00:00:00')",
tableName);

String plan =
explainPlan(
"SELECT user_id, event_time," + " ROW_NUMBER() OVER (ORDER BY user_id) AS rn FROM %s",
tableName);
assertThat(plan).as("Sort required: bucket transform is hash, not range").contains("Sort [");

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Required sorting cases

}

@TestTemplate
public void testNoOutputOrderingForMultiFileTaskGroup() {
// Default split size — small files get bin-packed into multi-file task groups
sql(
"CREATE TABLE %s (user_id BIGINT, event_time TIMESTAMP) USING iceberg"
+ " TBLPROPERTIES ('%s'='%s')",
tableName, TableProperties.DEFAULT_FILE_FORMAT, format);
validationCatalog.loadTable(tableIdent).replaceSortOrder().asc("event_time").commit();

sql("INSERT INTO %s VALUES (1, TIMESTAMP '2024-01-01 00:00:00')", tableName);
sql("INSERT INTO %s VALUES (2, TIMESTAMP '2024-01-02 00:00:00')", tableName);

String plan =
explainPlan(
"SELECT user_id, event_time,"
+ " ROW_NUMBER() OVER (ORDER BY event_time) AS rn FROM %s",
tableName);
assertThat(plan)
.as("Sort required: multi-file task group does not preserve per-file ordering")
.contains("Sort [");
}

private String explainPlan(String sqlTemplate, Object... args) {
return spark
.sql("EXPLAIN " + String.format(sqlTemplate, args))
.collectAsList()
.get(0)
.getString(0);
}
}
Loading