-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Spark 4.1: Implement SupportsReportOrdering in scan to skip redundant sorts #16454
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
3cd2db5
1926752
e694f86
375d047
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1170,4 +1170,176 @@ private static NamedReference fieldRef(String col) { | |
| private static UserDefinedScalarFunc toUDF(BoundFunction function, Expression[] expressions) { | ||
| return new UserDefinedScalarFunc(function.name(), function.canonicalName(), expressions); | ||
| } | ||
|
|
||
| // --------------------------------------------------------------------------- | ||
| // SupportsReportOrdering — tests that sort_order_id is surfaced as | ||
| // outputOrdering so Spark can skip redundant sorts above sorted scans. | ||
| // Tracks https://github.com/apache/iceberg/issues/16430. | ||
| // --------------------------------------------------------------------------- | ||
|
|
||
| @TestTemplate | ||
| public void testOutputOrderingForSingleAscSortKey() { | ||
| sql( | ||
| "CREATE TABLE %s (user_id BIGINT, event_time TIMESTAMP) USING iceberg" | ||
| + " TBLPROPERTIES ('%s'='%s', 'read.split.target-size'='1'," | ||
| + " 'read.split.open-file-cost'='1')", | ||
| tableName, TableProperties.DEFAULT_FILE_FORMAT, format); | ||
| Table table = validationCatalog.loadTable(tableIdent); | ||
| table.replaceSortOrder().asc("event_time").commit(); | ||
|
|
||
| sql( | ||
| "INSERT INTO %s VALUES (1, TIMESTAMP '2024-01-01 00:00:00')," | ||
| + " (2, TIMESTAMP '2024-01-01 01:00:00')", | ||
| tableName); | ||
| sql( | ||
| "INSERT INTO %s VALUES (3, TIMESTAMP '2024-01-02 00:00:00')," | ||
| + " (4, TIMESTAMP '2024-01-02 01:00:00')", | ||
| tableName); | ||
|
|
||
| String plan = explainPlan("SELECT * FROM %s ORDER BY event_time LIMIT 100", tableName); | ||
| assertThat(plan) | ||
| .as("Sort eliminated when scan advertises outputOrdering for ASC sort key") | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sort eliminated |
||
| .doesNotContain("Sort ["); | ||
| } | ||
|
|
||
| @TestTemplate | ||
| public void testOutputOrderingForDescSortKey() { | ||
| sql( | ||
| "CREATE TABLE %s (user_id BIGINT, event_time TIMESTAMP) USING iceberg" | ||
| + " TBLPROPERTIES ('%s'='%s', 'read.split.target-size'='1'," | ||
| + " 'read.split.open-file-cost'='1')", | ||
| tableName, TableProperties.DEFAULT_FILE_FORMAT, format); | ||
| validationCatalog.loadTable(tableIdent).replaceSortOrder().desc("event_time").commit(); | ||
|
|
||
| sql( | ||
| "INSERT INTO %s VALUES (1, TIMESTAMP '2024-01-02 00:00:00')," | ||
| + " (2, TIMESTAMP '2024-01-01 00:00:00')", | ||
| tableName); | ||
|
|
||
| String plan = explainPlan("SELECT * FROM %s ORDER BY event_time DESC LIMIT 100", tableName); | ||
| assertThat(plan).as("Sort eliminated for DESC sort key").doesNotContain("Sort ["); | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| } | ||
|
|
||
| @TestTemplate | ||
| public void testOutputOrderingForCompositeSortKey() { | ||
| sql( | ||
| "CREATE TABLE %s (region STRING, user_id BIGINT, event_time TIMESTAMP) USING iceberg" | ||
| + " TBLPROPERTIES ('%s'='%s', 'read.split.target-size'='1'," | ||
| + " 'read.split.open-file-cost'='1')", | ||
| tableName, TableProperties.DEFAULT_FILE_FORMAT, format); | ||
| validationCatalog | ||
| .loadTable(tableIdent) | ||
| .replaceSortOrder() | ||
| .asc("region") | ||
| .asc("user_id") | ||
| .commit(); | ||
|
|
||
| sql( | ||
| "INSERT INTO %s VALUES ('us', 1, TIMESTAMP '2024-01-01 00:00:00')," | ||
| + " ('us', 2, TIMESTAMP '2024-01-02 00:00:00')," | ||
| + " ('eu', 3, TIMESTAMP '2024-01-03 00:00:00')", | ||
| tableName); | ||
|
|
||
| String plan = explainPlan("SELECT * FROM %s ORDER BY region, user_id LIMIT 100", tableName); | ||
| assertThat(plan).as("Sort eliminated for composite sort key").doesNotContain("Sort ["); | ||
| } | ||
|
|
||
| @TestTemplate | ||
| public void testNoOutputOrderingForUnsortedTable() { | ||
| sql( | ||
| "CREATE TABLE %s (user_id BIGINT, event_time TIMESTAMP) USING iceberg" | ||
| + " TBLPROPERTIES ('%s'='%s')", | ||
| tableName, TableProperties.DEFAULT_FILE_FORMAT, format); | ||
|
|
||
| sql( | ||
| "INSERT INTO %s VALUES (1, TIMESTAMP '2024-01-01 00:00:00')," | ||
| + " (2, TIMESTAMP '2024-01-02 00:00:00')", | ||
| tableName); | ||
|
|
||
| // Window function forces an explicit local Sort that outputOrdering would otherwise satisfy | ||
| String plan = | ||
| explainPlan( | ||
| "SELECT user_id, event_time," | ||
| + " ROW_NUMBER() OVER (ORDER BY event_time) AS rn FROM %s", | ||
| tableName); | ||
| assertThat(plan).as("Sort required for unsorted table").contains("Sort ["); | ||
| } | ||
|
|
||
| @TestTemplate | ||
| public void testNoOutputOrderingForMixedSortOrderIds() { | ||
| sql( | ||
| "CREATE TABLE %s (user_id BIGINT, event_time TIMESTAMP) USING iceberg" | ||
| + " TBLPROPERTIES ('%s'='%s', 'read.split.target-size'='1'," | ||
| + " 'read.split.open-file-cost'='1')", | ||
| tableName, TableProperties.DEFAULT_FILE_FORMAT, format); | ||
|
|
||
| Table table = validationCatalog.loadTable(tableIdent); | ||
| table.replaceSortOrder().asc("event_time").commit(); | ||
| sql("INSERT INTO %s VALUES (1, TIMESTAMP '2024-01-01 00:00:00')", tableName); | ||
|
|
||
| table.refresh(); | ||
| table.replaceSortOrder().asc("user_id").commit(); | ||
| sql("INSERT INTO %s VALUES (2, TIMESTAMP '2024-01-02 00:00:00')", tableName); | ||
|
|
||
| String plan = | ||
| explainPlan( | ||
| "SELECT user_id, event_time," + " ROW_NUMBER() OVER (ORDER BY user_id) AS rn FROM %s", | ||
| tableName); | ||
| assertThat(plan).as("Sort required when sort_order_id differs across files").contains("Sort ["); | ||
| } | ||
|
|
||
| @TestTemplate | ||
| public void testNoOutputOrderingForBucketTransform() { | ||
| sql( | ||
| "CREATE TABLE %s (user_id BIGINT, event_time TIMESTAMP) USING iceberg" | ||
| + " TBLPROPERTIES ('%s'='%s', 'read.split.target-size'='1'," | ||
| + " 'read.split.open-file-cost'='1')", | ||
| tableName, TableProperties.DEFAULT_FILE_FORMAT, format); | ||
| validationCatalog | ||
| .loadTable(tableIdent) | ||
| .replaceSortOrder() | ||
| .asc(org.apache.iceberg.expressions.Expressions.bucket("user_id", 8)) | ||
| .commit(); | ||
|
|
||
| sql( | ||
| "INSERT INTO %s VALUES (1, TIMESTAMP '2024-01-01 00:00:00')," | ||
| + " (2, TIMESTAMP '2024-01-02 00:00:00')", | ||
| tableName); | ||
|
|
||
| String plan = | ||
| explainPlan( | ||
| "SELECT user_id, event_time," + " ROW_NUMBER() OVER (ORDER BY user_id) AS rn FROM %s", | ||
| tableName); | ||
| assertThat(plan).as("Sort required: bucket transform is hash, not range").contains("Sort ["); | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Required sorting cases |
||
| } | ||
|
|
||
| @TestTemplate | ||
| public void testNoOutputOrderingForMultiFileTaskGroup() { | ||
| // Default split size — small files get bin-packed into multi-file task groups | ||
| sql( | ||
| "CREATE TABLE %s (user_id BIGINT, event_time TIMESTAMP) USING iceberg" | ||
| + " TBLPROPERTIES ('%s'='%s')", | ||
| tableName, TableProperties.DEFAULT_FILE_FORMAT, format); | ||
| validationCatalog.loadTable(tableIdent).replaceSortOrder().asc("event_time").commit(); | ||
|
|
||
| sql("INSERT INTO %s VALUES (1, TIMESTAMP '2024-01-01 00:00:00')", tableName); | ||
| sql("INSERT INTO %s VALUES (2, TIMESTAMP '2024-01-02 00:00:00')", tableName); | ||
|
|
||
| String plan = | ||
| explainPlan( | ||
| "SELECT user_id, event_time," | ||
| + " ROW_NUMBER() OVER (ORDER BY event_time) AS rn FROM %s", | ||
| tableName); | ||
| assertThat(plan) | ||
| .as("Sort required: multi-file task group does not preserve per-file ordering") | ||
| .contains("Sort ["); | ||
| } | ||
|
|
||
| private String explainPlan(String sqlTemplate, Object... args) { | ||
| return spark | ||
| .sql("EXPLAIN " + String.format(sqlTemplate, args)) | ||
| .collectAsList() | ||
| .get(0) | ||
| .getString(0); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any uncertainty about ordering → empty result → identical to today