diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java index 7e0f6207edc9..9e9d751691be 100644 --- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java @@ -1424,6 +1424,62 @@ public void testDeleteToCustomWapBranchWithoutWhereClause() throws NoSuchTableEx }); } + @TestTemplate + public void testDeleteToWapBranchCanDeleteWhereScansWapBranch() throws NoSuchTableException { + assumeThat(branch).as("WAP branch only works for table identifier without branch").isNull(); + + createAndInitPartitionedTable(); + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')", + tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED); + + append(tableName, new Employee(1, "hr")); + + spark.conf().set(SparkSQLProperties.WAP_BRANCH, "wap"); + try { + append(tableName, new Employee(0, "hr"), new Employee(1, "hr"), new Employee(2, "hr")); + + sql("DELETE FROM %s WHERE id = 1", tableName); + + assertThat(sql("SELECT id, dep FROM %s.branch_wap ORDER BY id", tableName)) + .as("DELETE should remove the matching rows from the WAP branch") + .containsExactly(row(0, "hr"), row(2, "hr")); + assertThat(sql("SELECT id, dep FROM %s.branch_main", tableName)) + .as("Main branch must not be modified by a WAP-targeted DELETE") + .containsExactly(row(1, "hr")); + } finally { + spark.conf().unset(SparkSQLProperties.WAP_BRANCH); + } + } + + @TestTemplate + public void testMetadataDeleteToWapBranchCommitsToWapBranch() throws NoSuchTableException { + assumeThat(branch).as("WAP branch only works for table identifier without branch").isNull(); + + createAndInitPartitionedTable(); + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')", + tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED); + + append(tableName, new Employee(1, "hr"), new Employee(5, "eng")); + + spark.conf().set(SparkSQLProperties.WAP_BRANCH, "wap"); + try { + append(tableName, new Employee(0, "hr"), new Employee(2, "eng")); + + sql("DELETE FROM %s WHERE dep = 'hr'", tableName); + + assertThat(sql("SELECT id, dep FROM %s.branch_wap ORDER BY id", tableName)) + .as("Metadata delete should remove the hr partition on the WAP branch") + .containsExactly(row(2, "eng"), row(5, "eng")); + assertThat(sql("SELECT id, dep FROM %s.branch_main ORDER BY id", tableName)) + .as("Metadata delete must not commit to main when WAP is set") + .containsExactly(row(1, "hr"), row(5, "eng")); + } finally { + spark.conf().unset(SparkSQLProperties.WAP_BRANCH); + } + } + @TestTemplate public void testDeleteWithFilterOnNestedColumn() { createAndInitNestedColumnsTable(); diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index 07db8c4ed3fe..80a40d72c8d1 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -49,6 +49,7 @@ import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkReadConf; +import org.apache.iceberg.spark.SparkTableUtil; import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.spark.SparkV2Filters; import org.apache.iceberg.spark.TimeTravel; @@ -208,11 +209,14 @@ public boolean canDeleteWhere(Predicate[] predicates) { } } - return canDeleteUsingMetadata(deleteExpr); + String scanBranch = + SparkTableUtil.determineReadBranch( + spark(), table(), branch, CaseInsensitiveStringMap.empty()); + return canDeleteUsingMetadata(deleteExpr, scanBranch); } // a metadata delete is possible iff matching files can be deleted entirely - private boolean canDeleteUsingMetadata(Expression deleteExpr) { + private boolean canDeleteUsingMetadata(Expression deleteExpr, String scanBranch) { boolean caseSensitive = SparkUtil.caseSensitive(spark()); if (ExpressionUtil.selectsPartitions(deleteExpr, table(), caseSensitive)) { @@ -227,7 +231,9 @@ private boolean canDeleteUsingMetadata(Expression deleteExpr) { .includeColumnStats() .ignoreResiduals(); - if (snapshot != null) { + if (scanBranch != null) { + scan = scan.useRef(scanBranch); + } else if (snapshot != null) { scan = scan.useSnapshot(snapshot.snapshotId()); } @@ -269,8 +275,12 @@ public void deleteWhere(Predicate[] predicates) { .set("spark.app.id", spark().sparkContext().applicationId()) .deleteFromRowFilter(deleteExpr); - if (branch != null) { - deleteFiles.toBranch(branch); + String writeBranch = + SparkTableUtil.determineWriteBranch( + spark(), table(), branch, CaseInsensitiveStringMap.empty()); + + if (writeBranch != null) { + deleteFiles.toBranch(writeBranch); } if (!CommitMetadata.commitProperties().isEmpty()) {