diff --git a/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java b/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java index b55280a6537f..dd92d33cda79 100644 --- a/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java +++ b/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java @@ -84,7 +84,8 @@ public abstract void cleanFiles( "manifest_length", "partition_spec_id", "added_snapshot_id", - "deleted_data_files_count"); + "added_files_count", + "deleted_files_count"); protected CloseableIterable readManifests(Snapshot snapshot) { if (snapshot.manifestListLocation() != null) { diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java index 384b7132ef76..89cb242d8f9d 100644 --- a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java +++ b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java @@ -2207,4 +2207,110 @@ private static PartitionStatisticsFile reusePartitionStatsFile( private static void commitPartitionStats(Table table, PartitionStatisticsFile statisticsFile) { table.updatePartitionStatistics().setPartitionStatistics(statisticsFile).commit(); } + + /** + * Verifies that during incremental cleanup, manifests containing only added/existing files are + * not opened for scanning. Before the fix, the manifest list projection used the wrong field name + * "deleted_data_files_count" causing hasDeletedFiles() to always return true and every manifest + * to be unnecessarily scanned. + */ + @TestTemplate + public void testAppendOnlyManifestsNotScannedDuringCleanup() { + // This optimization is specific to incremental cleanup + assumeThat(incrementalCleanup).isTrue(); + + TestTables.LocalFileIO spyFileIO = Mockito.spy(new TestTables.LocalFileIO()); + String tableName = "testAppendOnlyManifests"; + Table testTable = + TestTables.create( + tableDir, + tableName, + SCHEMA, + SPEC, + SortOrder.unsorted(), + formatVersion, + new TestTables.TestTableOperations(tableName, tableDir, spyFileIO)); + + // Append FILE_A: manifest has added_files_count=1, deleted_files_count=0 + testTable.newAppend().appendFile(FILE_A).commit(); + Snapshot firstSnapshot = testTable.currentSnapshot(); + + // Collect manifest paths from the first snapshot (append-only manifests) + Set appendOnlyManifestPaths = + firstSnapshot.allManifests(testTable.io()).stream() + .map(ManifestFile::path) + .collect(Collectors.toSet()); + + waitUntilAfter(firstSnapshot.timestampMillis()); + + // Append FILE_B + testTable.newAppend().appendFile(FILE_B).commit(); + long tAfterCommits = waitUntilAfter(testTable.currentSnapshot().timestampMillis()); + + // Clear spy interactions before expiration so we only track expiration reads + Mockito.clearInvocations(spyFileIO); + + Set deletedFiles = Sets.newHashSet(); + removeSnapshots(testTable) + .expireOlderThan(tAfterCommits) + .deleteWith(deletedFiles::add) + .commit(); + + // No data files should be deleted since all manifests only have added files + assertThat(deletedFiles) + .as("No data files should be deleted for append-only snapshots") + .noneMatch(f -> f.endsWith(".parquet")); + + // Verify that the append-only manifests were NOT opened for entry scanning + for (String manifestPath : appendOnlyManifestPaths) { + Mockito.verify(spyFileIO, Mockito.never()).newInputFile(manifestPath); + } + } + + /** + * Verifies that during incremental cleanup, manifests with deleted entries are properly scanned + * and the deleted data files are cleaned up. + */ + @TestTemplate + public void testManifestsWithDeletesScannedDuringCleanup() { + // This optimization is specific to incremental cleanup + assumeThat(incrementalCleanup).isTrue(); + + TestTables.LocalFileIO spyFileIO = Mockito.spy(new TestTables.LocalFileIO()); + String tableName = "testManifestsWithDeletes"; + Table testTable = + TestTables.create( + tableDir, + tableName, + SCHEMA, + SPEC, + SortOrder.unsorted(), + formatVersion, + new TestTables.TestTableOperations(tableName, tableDir, spyFileIO)); + + // Append FILE_A and FILE_B + testTable.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); + + waitUntilAfter(testTable.currentSnapshot().timestampMillis()); + + // Delete FILE_A: creates a manifest with deleted_files_count > 0 + testTable.newDelete().deleteFile(FILE_A).commit(); + + waitUntilAfter(testTable.currentSnapshot().timestampMillis()); + + // Append FILE_C to keep the table active with a newer snapshot + testTable.newAppend().appendFile(FILE_C).commit(); + long tAfterCommits = waitUntilAfter(testTable.currentSnapshot().timestampMillis()); + + Set deletedFiles = Sets.newHashSet(); + removeSnapshots(testTable) + .expireOlderThan(tAfterCommits) + .deleteWith(deletedFiles::add) + .commit(); + + // FILE_A should be deleted since it was removed in an expired ancestor snapshot + assertThat(deletedFiles) + .as("Deleted data file should be cleaned up during expiration") + .contains(FILE_A.location()); + } }