diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java index 28a45d2c7821..b1ed0640c58d 100644 --- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java +++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java @@ -184,7 +184,8 @@ private void cacheManifests(FileIO fileIO) { // if manifests isn't set, then the snapshotFile is set and should be read to get the list this.allManifests = ManifestLists.read( - fileIO.newInputFile(new BaseManifestListFile(manifestListLocation, keyId))); + ManifestFiles.newInputFile( + fileIO, new BaseManifestListFile(manifestListLocation, keyId))); } if (dataManifests == null || deleteManifests == null) { diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java index 5ac55f0cf41f..d2fb859a2110 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java @@ -562,6 +562,15 @@ private static ManifestFile copyManifestInternal( return writer.toManifestFile(); } + static InputFile newInputFile(FileIO io, ManifestListFile manifestList) { + InputFile input = io.newInputFile(manifestList); + if (cachingEnabled(io)) { + return contentCache(io).tryCache(input); + } + + return input; + } + private static InputFile newInputFile(FileIO io, ManifestFile manifest) { InputFile input = io.newInputFile(manifest); if (cachingEnabled(io)) { diff --git a/core/src/test/java/org/apache/iceberg/TestManifestCaching.java b/core/src/test/java/org/apache/iceberg/TestManifestCaching.java index 57acb7d92d05..a321d9092449 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestCaching.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestCaching.java @@ -75,18 +75,20 @@ public void testPlanWithCache() throws Exception { TableScan scan1 = table.newScan().option(TableProperties.SPLIT_SIZE, String.valueOf(8 * 1024 * 1024)); assertThat(scan1.planTasks()).hasSize(numFiles * 2); + // each append commit caches one manifest and one manifest list: parent manifest lists are + // read while committing, and the current snapshot's manifest list is read while planning assertThat(cache.estimatedCacheSize()) - .as("All manifest files should be cached") - .isEqualTo(numFiles); + .as("All manifest files and manifest lists should be cached") + .isEqualTo(numFiles * 2); assertThat(cache.stats().loadSuccessCount()) - .as("All manifest files should be recently loaded") - .isEqualTo(numFiles); + .as("All manifest files and manifest lists should be recently loaded") + .isEqualTo(numFiles * 2); long missCount = ManifestFiles.contentCacheStats(table.io()).missCount(); // planFiles and verify that cache size still the same TableScan scan2 = table.newScan(); assertThat(scan2.planFiles()).hasSize(numFiles); - assertThat(cache.estimatedCacheSize()).isEqualTo(numFiles); + assertThat(cache.estimatedCacheSize()).isEqualTo(numFiles * 2); assertThat(ManifestFiles.contentCacheStats(table.io()).missCount()) .as("All manifest file reads should hit cache") .isEqualTo(missCount); @@ -94,6 +96,43 @@ public void testPlanWithCache() throws Exception { ManifestFiles.dropCache(table.io()); } + @Test + public void testManifestListCaching() throws Exception { + Map properties = + ImmutableMap.of( + CatalogProperties.FILE_IO_IMPL, + HadoopFileIO.class.getName(), + CatalogProperties.IO_MANIFEST_CACHE_ENABLED, + "true"); + Table table = createTable(properties); + appendFiles(newFiles(1, 1024), table); + String metadataLocation = + ((HasTableOperations) table).operations().current().metadataFileLocation(); + + // start from an empty cache because the commit above may have already populated it + ManifestFiles.dropCache(table.io()); + ContentCache cache = ManifestFiles.contentCache(table.io()); + assertThat(cache.estimatedCacheSize()).isEqualTo(0); + + // a freshly parsed snapshot reads the manifest list through the content cache + TableMetadata firstMetadata = TableMetadataParser.read(table.io(), metadataLocation); + assertThat(firstMetadata.currentSnapshot().allManifests(table.io())).hasSize(1); + assertThat(cache.estimatedCacheSize()).as("Manifest list should be cached").isEqualTo(1); + assertThat(ManifestFiles.contentCacheStats(table.io()).missCount()).isEqualTo(1); + long hits = ManifestFiles.contentCacheStats(table.io()).hitCount(); + + // a new snapshot instance reading the same manifest list is served from the cache + TableMetadata secondMetadata = TableMetadataParser.read(table.io(), metadataLocation); + assertThat(secondMetadata.currentSnapshot().allManifests(table.io())).hasSize(1); + assertThat(cache.estimatedCacheSize()).isEqualTo(1); + assertThat(ManifestFiles.contentCacheStats(table.io()).missCount()) + .as("Manifest list read should be served from the cache") + .isEqualTo(1); + assertThat(ManifestFiles.contentCacheStats(table.io()).hitCount()).isGreaterThan(hits); + + ManifestFiles.dropCache(table.io()); + } + @Test public void testPlanWithSmallCache() throws Exception { Map properties =