Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1424,6 +1424,62 @@ public void testDeleteToCustomWapBranchWithoutWhereClause() throws NoSuchTableEx
});
}

@TestTemplate
public void testDeleteToWapBranchCanDeleteWhereScansWapBranch() throws NoSuchTableException {
assumeThat(branch).as("WAP branch only works for table identifier without branch").isNull();

createAndInitPartitionedTable();
sql(
"ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);

append(tableName, new Employee(1, "hr"));

spark.conf().set(SparkSQLProperties.WAP_BRANCH, "wap");
try {
append(tableName, new Employee(0, "hr"), new Employee(1, "hr"), new Employee(2, "hr"));

sql("DELETE FROM %s WHERE id = 1", tableName);

assertThat(sql("SELECT id, dep FROM %s.branch_wap ORDER BY id", tableName))
.as("DELETE should remove the matching rows from the WAP branch")
.containsExactly(row(0, "hr"), row(2, "hr"));
assertThat(sql("SELECT id, dep FROM %s.branch_main", tableName))
.as("Main branch must not be modified by a WAP-targeted DELETE")
.containsExactly(row(1, "hr"));
} finally {
spark.conf().unset(SparkSQLProperties.WAP_BRANCH);
}
}

@TestTemplate
public void testMetadataDeleteToWapBranchCommitsToWapBranch() throws NoSuchTableException {
assumeThat(branch).as("WAP branch only works for table identifier without branch").isNull();

createAndInitPartitionedTable();
sql(
"ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);

append(tableName, new Employee(1, "hr"), new Employee(5, "eng"));

spark.conf().set(SparkSQLProperties.WAP_BRANCH, "wap");
try {
append(tableName, new Employee(0, "hr"), new Employee(2, "eng"));

sql("DELETE FROM %s WHERE dep = 'hr'", tableName);

assertThat(sql("SELECT id, dep FROM %s.branch_wap ORDER BY id", tableName))
.as("Metadata delete should remove the hr partition on the WAP branch")
.containsExactly(row(2, "eng"), row(5, "eng"));
assertThat(sql("SELECT id, dep FROM %s.branch_main ORDER BY id", tableName))
.as("Metadata delete must not commit to main when WAP is set")
.containsExactly(row(1, "hr"), row(5, "eng"));
} finally {
spark.conf().unset(SparkSQLProperties.WAP_BRANCH);
}
}

@TestTemplate
public void testDeleteWithFilterOnNestedColumn() {
createAndInitNestedColumnsTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.spark.SparkV2Filters;
import org.apache.iceberg.spark.TimeTravel;
Expand Down Expand Up @@ -208,11 +209,14 @@ public boolean canDeleteWhere(Predicate[] predicates) {
}
}

return canDeleteUsingMetadata(deleteExpr);
String scanBranch =
SparkTableUtil.determineReadBranch(
spark(), table(), branch, CaseInsensitiveStringMap.empty());
return canDeleteUsingMetadata(deleteExpr, scanBranch);
}

// a metadata delete is possible iff matching files can be deleted entirely
private boolean canDeleteUsingMetadata(Expression deleteExpr) {
private boolean canDeleteUsingMetadata(Expression deleteExpr, String scanBranch) {
boolean caseSensitive = SparkUtil.caseSensitive(spark());

if (ExpressionUtil.selectsPartitions(deleteExpr, table(), caseSensitive)) {
Expand All @@ -227,7 +231,9 @@ private boolean canDeleteUsingMetadata(Expression deleteExpr) {
.includeColumnStats()
.ignoreResiduals();

if (snapshot != null) {
if (scanBranch != null) {
scan = scan.useRef(scanBranch);
} else if (snapshot != null) {
scan = scan.useSnapshot(snapshot.snapshotId());
}

Expand Down Expand Up @@ -269,8 +275,12 @@ public void deleteWhere(Predicate[] predicates) {
.set("spark.app.id", spark().sparkContext().applicationId())
.deleteFromRowFilter(deleteExpr);

if (branch != null) {
deleteFiles.toBranch(branch);
String writeBranch =
SparkTableUtil.determineWriteBranch(
spark(), table(), branch, CaseInsensitiveStringMap.empty());

if (writeBranch != null) {
deleteFiles.toBranch(writeBranch);
}

if (!CommitMetadata.commitProperties().isEmpty()) {
Expand Down
Loading