From 3a58a6a2a58c95107bf13695fddae241d176a11b Mon Sep 17 00:00:00 2001 From: Yingjian Wu Date: Wed, 4 Mar 2026 09:41:32 -0800 Subject: [PATCH 1/3] fix branch delete for canDeleteWhere where it does not resolve to the correct branch --- .../iceberg/spark/source/SparkTable.java | 4 +++ .../iceberg/spark/sql/TestDeleteFrom.java | 30 +++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index 07db8c4ed3fe..e180aa259001 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -197,6 +197,10 @@ public RowLevelOperationBuilder newRowLevelOperationBuilder(RowLevelOperationInf public boolean canDeleteWhere(Predicate[] predicates) { Preconditions.checkArgument(timeTravel == null, "Cannot delete from table with time travel"); + if (SparkTableUtil.wapEnabled(table())) { + branch = SparkTableUtil.determineWriteBranch(sparkSession(), icebergTable, branch); + } + Expression deleteExpr = Expressions.alwaysTrue(); for (Predicate predicate : predicates) { diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java index 536d568003cf..cdfbfa0907a6 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java @@ -24,9 +24,11 @@ import java.util.List; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.CatalogTestBase; +import org.apache.iceberg.spark.SparkSQLProperties; import org.apache.iceberg.spark.source.SimpleRecord; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -185,4 +187,32 @@ public void testDeleteFromTablePartitionedByVarbinary() { ImmutableList.of(row(1L, new byte[] {-29, -68, -47})), sql("SELECT * FROM %s where data = X'e3bcd1'", tableName)); } + + @TestTemplate + public void testDeleteWithWapBranch() throws NoSuchTableException { + sql( + "CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES ('%s' = 'true')", + tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED); + + spark.conf().set(SparkSQLProperties.WAP_BRANCH, "dev1"); + try { + // all rows go into one file on the WAP branch; main stays empty + List records = + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + df.coalesce(1).writeTo(tableName).append(); + + // delete a subset of rows - canDeleteWhere and deleteWhere must both + // resolve the WAP branch so they scan and commit to the same branch + sql("DELETE FROM %s WHERE id = 1", tableName); + + assertEquals( + "Should have deleted only the matching row on the WAP branch", + ImmutableList.of(row(2L, "b"), row(3L, "c")), + sql("SELECT * FROM %s VERSION AS OF 'dev1' ORDER BY id", tableName)); + } finally { + spark.conf().unset(SparkSQLProperties.WAP_BRANCH); + } + } } From edc14de79cb0ccfdd436c4a5cfc8b0c5815ab2cb Mon Sep 17 00:00:00 2001 From: Yingjian Wu Date: Wed, 4 Mar 2026 12:17:11 -0800 Subject: [PATCH 2/3] fixing test --- .../java/org/apache/iceberg/spark/source/SparkTable.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index e180aa259001..47fef4072915 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -197,10 +197,6 @@ public RowLevelOperationBuilder newRowLevelOperationBuilder(RowLevelOperationInf public boolean canDeleteWhere(Predicate[] predicates) { Preconditions.checkArgument(timeTravel == null, "Cannot delete from table with time travel"); - if (SparkTableUtil.wapEnabled(table())) { - branch = SparkTableUtil.determineWriteBranch(sparkSession(), icebergTable, branch); - } - Expression deleteExpr = Expressions.alwaysTrue(); for (Predicate predicate : predicates) { @@ -212,7 +208,10 @@ public boolean canDeleteWhere(Predicate[] predicates) { } } - return canDeleteUsingMetadata(deleteExpr); + String scanBranch = + SparkTableUtil.determineReadBranch( + sparkSession(), icebergTable, branch, CaseInsensitiveStringMap.empty()); + return canDeleteUsingMetadata(deleteExpr, scanBranch); } // a metadata delete is possible iff matching files can be deleted entirely From feb9bc6e1a1613b6dc53d094a09db14e403d8c4d Mon Sep 17 00:00:00 2001 From: Yingjian Wu Date: Tue, 21 Apr 2026 11:02:34 -0700 Subject: [PATCH 3/3] rebase and address comments --- .../iceberg/spark/extensions/TestDelete.java | 56 +++++++++++++++++++ .../iceberg/spark/source/SparkTable.java | 17 ++++-- .../iceberg/spark/sql/TestDeleteFrom.java | 30 ---------- 3 files changed, 68 insertions(+), 35 deletions(-) diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java index 7e0f6207edc9..9e9d751691be 100644 --- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java @@ -1424,6 +1424,62 @@ public void testDeleteToCustomWapBranchWithoutWhereClause() throws NoSuchTableEx }); } + @TestTemplate + public void testDeleteToWapBranchCanDeleteWhereScansWapBranch() throws NoSuchTableException { + assumeThat(branch).as("WAP branch only works for table identifier without branch").isNull(); + + createAndInitPartitionedTable(); + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')", + tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED); + + append(tableName, new Employee(1, "hr")); + + spark.conf().set(SparkSQLProperties.WAP_BRANCH, "wap"); + try { + append(tableName, new Employee(0, "hr"), new Employee(1, "hr"), new Employee(2, "hr")); + + sql("DELETE FROM %s WHERE id = 1", tableName); + + assertThat(sql("SELECT id, dep FROM %s.branch_wap ORDER BY id", tableName)) + .as("DELETE should remove the matching rows from the WAP branch") + .containsExactly(row(0, "hr"), row(2, "hr")); + assertThat(sql("SELECT id, dep FROM %s.branch_main", tableName)) + .as("Main branch must not be modified by a WAP-targeted DELETE") + .containsExactly(row(1, "hr")); + } finally { + spark.conf().unset(SparkSQLProperties.WAP_BRANCH); + } + } + + @TestTemplate + public void testMetadataDeleteToWapBranchCommitsToWapBranch() throws NoSuchTableException { + assumeThat(branch).as("WAP branch only works for table identifier without branch").isNull(); + + createAndInitPartitionedTable(); + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')", + tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED); + + append(tableName, new Employee(1, "hr"), new Employee(5, "eng")); + + spark.conf().set(SparkSQLProperties.WAP_BRANCH, "wap"); + try { + append(tableName, new Employee(0, "hr"), new Employee(2, "eng")); + + sql("DELETE FROM %s WHERE dep = 'hr'", tableName); + + assertThat(sql("SELECT id, dep FROM %s.branch_wap ORDER BY id", tableName)) + .as("Metadata delete should remove the hr partition on the WAP branch") + .containsExactly(row(2, "eng"), row(5, "eng")); + assertThat(sql("SELECT id, dep FROM %s.branch_main ORDER BY id", tableName)) + .as("Metadata delete must not commit to main when WAP is set") + .containsExactly(row(1, "hr"), row(5, "eng")); + } finally { + spark.conf().unset(SparkSQLProperties.WAP_BRANCH); + } + } + @TestTemplate public void testDeleteWithFilterOnNestedColumn() { createAndInitNestedColumnsTable(); diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index 47fef4072915..80a40d72c8d1 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -49,6 +49,7 @@ import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkReadConf; +import org.apache.iceberg.spark.SparkTableUtil; import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.spark.SparkV2Filters; import org.apache.iceberg.spark.TimeTravel; @@ -210,12 +211,12 @@ public boolean canDeleteWhere(Predicate[] predicates) { String scanBranch = SparkTableUtil.determineReadBranch( - sparkSession(), icebergTable, branch, CaseInsensitiveStringMap.empty()); + spark(), table(), branch, CaseInsensitiveStringMap.empty()); return canDeleteUsingMetadata(deleteExpr, scanBranch); } // a metadata delete is possible iff matching files can be deleted entirely - private boolean canDeleteUsingMetadata(Expression deleteExpr) { + private boolean canDeleteUsingMetadata(Expression deleteExpr, String scanBranch) { boolean caseSensitive = SparkUtil.caseSensitive(spark()); if (ExpressionUtil.selectsPartitions(deleteExpr, table(), caseSensitive)) { @@ -230,7 +231,9 @@ private boolean canDeleteUsingMetadata(Expression deleteExpr) { .includeColumnStats() .ignoreResiduals(); - if (snapshot != null) { + if (scanBranch != null) { + scan = scan.useRef(scanBranch); + } else if (snapshot != null) { scan = scan.useSnapshot(snapshot.snapshotId()); } @@ -272,8 +275,12 @@ public void deleteWhere(Predicate[] predicates) { .set("spark.app.id", spark().sparkContext().applicationId()) .deleteFromRowFilter(deleteExpr); - if (branch != null) { - deleteFiles.toBranch(branch); + String writeBranch = + SparkTableUtil.determineWriteBranch( + spark(), table(), branch, CaseInsensitiveStringMap.empty()); + + if (writeBranch != null) { + deleteFiles.toBranch(writeBranch); } if (!CommitMetadata.commitProperties().isEmpty()) { diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java index cdfbfa0907a6..536d568003cf 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java @@ -24,11 +24,9 @@ import java.util.List; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Table; -import org.apache.iceberg.TableProperties; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.CatalogTestBase; -import org.apache.iceberg.spark.SparkSQLProperties; import org.apache.iceberg.spark.source.SimpleRecord; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -187,32 +185,4 @@ public void testDeleteFromTablePartitionedByVarbinary() { ImmutableList.of(row(1L, new byte[] {-29, -68, -47})), sql("SELECT * FROM %s where data = X'e3bcd1'", tableName)); } - - @TestTemplate - public void testDeleteWithWapBranch() throws NoSuchTableException { - sql( - "CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES ('%s' = 'true')", - tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED); - - spark.conf().set(SparkSQLProperties.WAP_BRANCH, "dev1"); - try { - // all rows go into one file on the WAP branch; main stays empty - List records = - Lists.newArrayList( - new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); - Dataset df = spark.createDataFrame(records, SimpleRecord.class); - df.coalesce(1).writeTo(tableName).append(); - - // delete a subset of rows - canDeleteWhere and deleteWhere must both - // resolve the WAP branch so they scan and commit to the same branch - sql("DELETE FROM %s WHERE id = 1", tableName); - - assertEquals( - "Should have deleted only the matching row on the WAP branch", - ImmutableList.of(row(2L, "b"), row(3L, "c")), - sql("SELECT * FROM %s VERSION AS OF 'dev1' ORDER BY id", tableName)); - } finally { - spark.conf().unset(SparkSQLProperties.WAP_BRANCH); - } - } }