Spark: fix delete from branch for canDeleteWhere where it does not resolve to the correct branch#15512
Spark: fix delete from branch for canDeleteWhere where it does not resolve to the correct branch#15512yingjianwu98 wants to merge 3 commits intoapache:mainfrom
Conversation
|
From Netflix discussion, this sounds like a correctness bug and release blocker for 1.11. |
|
|
||
| spark.conf().set(SparkSQLProperties.WAP_BRANCH, "dev1"); | ||
| try { | ||
| // all rows go into one file on the WAP branch; main stays empty |
There was a problem hiding this comment.
can we also insert some rows/files into the main branch first? ideally with a row of matching the predicate of id=1 .
| // resolve the WAP branch so they scan and commit to the same branch | ||
| sql("DELETE FROM %s WHERE id = 1", tableName); | ||
|
|
||
| assertEquals( |
There was a problem hiding this comment.
use assertj
assertThat(sql("SELECT * FROM %s VERSION AS OF 'dev1' ORDER BY id", tableName))
.containsExactlyInAnyOrder(...)
|
@yingjianwu98 can you resolve the merge conflict? |
amogh-jahagirdar
left a comment
There was a problem hiding this comment.
Thanks @yingjianwu98 for the fix, this does look right to me just had some comments on the tests.
|
|
||
| spark.conf().set(SparkSQLProperties.WAP_BRANCH, "dev1"); | ||
| try { | ||
| // all rows go into one file on the WAP branch; main stays empty |
| // resolve the WAP branch so they scan and commit to the same branch | ||
| sql("DELETE FROM %s WHERE id = 1", tableName); | ||
|
|
||
| assertEquals( |
There was a problem hiding this comment.
I would also add an assertion for the main state as well
|
Thanks for the review! I will rebase today or tomorrow and ping back here. |
|
@yingjianwu98 this is marked a blocker for 1.11.0 release. please ping here when the rebase and comments are addressed. |
ac87dce to
feb9bc6
Compare
|
Apologize for the delay here. I have addressed the comments, and want to callout the following:
|
Problem
When WAP (Write-Audit-Publish) is enabled via spark.wap.branch, canDeleteWhere() and deleteWhere() scan different branches:
This causes canDeleteWhere() to incorrectly return true (metadata-only delete is possible) based on main's data, while deleteWhere() commits to the
WAP branch where the file has partial matches, resulting in:
ValidationException: Cannot delete file where some, but not all, rows match filter
Example
-- WAP enabled, spark.wap.branch = dev1
INSERT INTO t VALUES (1, 'a'), (2, 'b'), (3, 'c'); -- goes to dev1, main is empty
DELETE FROM t WHERE id = 1;
-- canDeleteWhere scans main (empty) → true → metadata delete
-- deleteWhere commits to dev1 → partial match → ValidationException
Fix
because the scan is a read operation, and determineReadBranch correctly handles the case where the WAP branch doesn't exist yet by falling back to
main.
reads and other operations that share the field.
Will work on the backport to other Spark versions once there is consensus from the community.