Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,10 @@ public void removingDataFileByExpressionAlsoRemovesDV() {
.containsEntry(SnapshotSummary.REPLACED_MANIFESTS_COUNT, "2");

assertThat(deleteSnap.deleteManifests(table.io())).hasSize(1);
assertThat(deleteSnap.summary())
.containsEntry(SnapshotSummary.REMOVED_DVS_PROP, "1")
.containsEntry(SnapshotSummary.REMOVED_DELETE_FILES_PROP, "1");

validateDeleteManifest(
deleteSnap.deleteManifests(table.io()).get(0),
dataSeqs(1L, 1L),
Expand Down Expand Up @@ -658,6 +662,10 @@ public void removingDataFileByPathAlsoRemovesDV() {
.containsEntry(SnapshotSummary.REPLACED_MANIFESTS_COUNT, "2");

assertThat(deleteSnap.deleteManifests(table.io())).hasSize(1);
assertThat(deleteSnap.summary())
.containsEntry(SnapshotSummary.REMOVED_DVS_PROP, "1")
.containsEntry(SnapshotSummary.REMOVED_DELETE_FILES_PROP, "1");

validateDeleteManifest(
deleteSnap.deleteManifests(table.io()).get(0),
dataSeqs(1L, 1L),
Expand All @@ -667,6 +675,69 @@ public void removingDataFileByPathAlsoRemovesDV() {
statuses(ManifestEntry.Status.DELETED, ManifestEntry.Status.EXISTING));
}

@TestTemplate
public void removingDataFilesWhenTruncatingAlsoRemovesDVs() {
assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
DeleteFile dv1 =
FileMetadata.deleteFileBuilder(SPEC)
.ofPositionDeletes()
.withPath("/path/to/data-1-deletes.puffin")
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=0")
.withRecordCount(5)
.withReferencedDataFile(DATA_FILE_BUCKET_0_IDS_0_2.location())
.withContentOffset(4)
.withContentSizeInBytes(6)
.build();

DeleteFile dv2 =
FileMetadata.deleteFileBuilder(SPEC)
.ofPositionDeletes()
.withPath("/path/to/data-2-deletes.puffin")
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=0")
.withRecordCount(5)
.withReferencedDataFile(DATA_FILE_BUCKET_0_IDS_8_10.location())
.withContentOffset(4)
.withContentSizeInBytes(6)
.build();

commit(
table,
table
.newRowDelta()
.addRows(DATA_FILE_BUCKET_0_IDS_0_2)
.addRows(DATA_FILE_BUCKET_0_IDS_8_10)
.addDeletes(dv1)
.addDeletes(dv2),
branch);

Snapshot snapshot = latestSnapshot(table, branch);
assertThat(snapshot.sequenceNumber()).isEqualTo(1);
assertThat(table.ops().current().lastSequenceNumber()).isEqualTo(1);

// deleting by row filter should also remove the orphaned dv1 from delete manifests. When a
// table is truncated via TRUNCATE, the row filter is sent as Expressions.alwaysTrue()
commit(table, table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()), branch);

Snapshot deleteSnap = latestSnapshot(table, branch);
assertThat(deleteSnap.sequenceNumber()).isEqualTo(2);
assertThat(table.ops().current().lastSequenceNumber()).isEqualTo(2);

assertThat(deleteSnap.deleteManifests(table.io())).hasSize(1);
assertThat(deleteSnap.summary())
.containsEntry(SnapshotSummary.REMOVED_DVS_PROP, "2")
.containsEntry(SnapshotSummary.REMOVED_DELETE_FILES_PROP, "2");

validateDeleteManifest(
deleteSnap.deleteManifests(table.io()).get(0),
dataSeqs(1L, 1L),
fileSeqs(1L, 1L),
ids(deleteSnap.snapshotId(), deleteSnap.snapshotId()),
files(dv1, dv2),
statuses(Status.DELETED, Status.DELETED));
}

private static ByteBuffer longToBuffer(long value) {
return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.util.List;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -185,4 +186,41 @@ public void testDeleteFromTablePartitionedByVarbinary() {
ImmutableList.of(row(1L, new byte[] {-29, -68, -47})),
sql("SELECT * FROM %s where data = X'e3bcd1'", tableName));
}

@TestTemplate
public void truncateWithDVs() throws NoSuchTableException {
sql(
"CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg TBLPROPERTIES ('format-version'='3','write.delete.mode'='merge-on-read')",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to parameterize format version to 3 and above?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it didn't seem worth to me to parameterize the entire test suite in order to run it across multiple format versions as that adds quite some execution overhead. But I can update if people think it's worth it

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to paramterize unless it's an egregious amount of test time (which I think we should look into separately if it's the case). From a local test on my Mac, run it takes ~20 seconds. I'm mostly looking at it from the perspective of once we make v4 commit path changes which are structurally different we still want to make sure the integration behavior doesn't regress in any manner.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker for me though, just my 2c.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went ahead and did the parameterization at the class level in #16098 so that we run all tests with v2+

tableName);
List<SimpleRecord> records =
ImmutableList.of(
new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
df.coalesce(1).writeTo(tableName).append();

assertThat(sql("SELECT * FROM %s ORDER BY id", tableName))
.containsExactly(row(1L, "a"), row(2L, "b"), row(3L, "c"));

sql("DELETE FROM %s WHERE id = 1", tableName);
assertThat(validationCatalog.loadTable(tableIdent).currentSnapshot().summary())
.containsEntry(SnapshotSummary.ADDED_DVS_PROP, "1")
.containsEntry(SnapshotSummary.ADDED_POS_DELETES_PROP, "1");

sql("DELETE FROM %s WHERE id = 2", tableName);
// DVs have been merged into single file
assertThat(validationCatalog.loadTable(tableIdent).currentSnapshot().summary())
.containsEntry(SnapshotSummary.ADDED_DVS_PROP, "1")
.containsEntry(SnapshotSummary.REMOVED_DVS_PROP, "1")
.containsEntry(SnapshotSummary.ADDED_POS_DELETES_PROP, "2");

assertThat(sql("SELECT * FROM %s ORDER BY id", tableName)).containsExactly(row(3L, "c"));

sql("TRUNCATE TABLE %s", tableName);
assertThat(validationCatalog.loadTable(tableIdent).currentSnapshot().summary())
.containsEntry(SnapshotSummary.REMOVED_DVS_PROP, "1")
.containsEntry(SnapshotSummary.REMOVED_DELETE_FILES_PROP, "1")
.containsEntry(SnapshotSummary.REMOVED_POS_DELETES_PROP, "2");

assertThat(sql("SELECT * FROM %s ORDER BY id", tableName)).isEmpty();
}
}
Loading