Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ private void cacheManifests(FileIO fileIO) {
// if manifests isn't set, then the snapshotFile is set and should be read to get the list
this.allManifests =
ManifestLists.read(
fileIO.newInputFile(new BaseManifestListFile(manifestListLocation, keyId)));
ManifestFiles.newInputFile(
fileIO, new BaseManifestListFile(manifestListLocation, keyId)));
}

if (dataManifests == null || deleteManifests == null) {
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/java/org/apache/iceberg/ManifestFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,15 @@ private static ManifestFile copyManifestInternal(
return writer.toManifestFile();
}

static InputFile newInputFile(FileIO io, ManifestListFile manifestList) {
InputFile input = io.newInputFile(manifestList);
if (cachingEnabled(io)) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if its good practice to add a new knob here or use the existing one

return contentCache(io).tryCache(input);
}

return input;
}

private static InputFile newInputFile(FileIO io, ManifestFile manifest) {
InputFile input = io.newInputFile(manifest);
if (cachingEnabled(io)) {
Expand Down
49 changes: 44 additions & 5 deletions core/src/test/java/org/apache/iceberg/TestManifestCaching.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,25 +75,64 @@ public void testPlanWithCache() throws Exception {
TableScan scan1 =
table.newScan().option(TableProperties.SPLIT_SIZE, String.valueOf(8 * 1024 * 1024));
assertThat(scan1.planTasks()).hasSize(numFiles * 2);
// each append commit caches one manifest and one manifest list: parent manifest lists are
// read while committing, and the current snapshot's manifest list is read while planning
assertThat(cache.estimatedCacheSize())
.as("All manifest files should be cached")
.isEqualTo(numFiles);
.as("All manifest files and manifest lists should be cached")
.isEqualTo(numFiles * 2);
assertThat(cache.stats().loadSuccessCount())
.as("All manifest files should be recently loaded")
.isEqualTo(numFiles);
.as("All manifest files and manifest lists should be recently loaded")
.isEqualTo(numFiles * 2);
long missCount = ManifestFiles.contentCacheStats(table.io()).missCount();

// planFiles and verify that cache size still the same
TableScan scan2 = table.newScan();
assertThat(scan2.planFiles()).hasSize(numFiles);
assertThat(cache.estimatedCacheSize()).isEqualTo(numFiles);
assertThat(cache.estimatedCacheSize()).isEqualTo(numFiles * 2);
assertThat(ManifestFiles.contentCacheStats(table.io()).missCount())
.as("All manifest file reads should hit cache")
.isEqualTo(missCount);

ManifestFiles.dropCache(table.io());
}

@Test
public void testManifestListCaching() throws Exception {
Map<String, String> properties =
ImmutableMap.of(
CatalogProperties.FILE_IO_IMPL,
HadoopFileIO.class.getName(),
CatalogProperties.IO_MANIFEST_CACHE_ENABLED,
"true");
Table table = createTable(properties);
appendFiles(newFiles(1, 1024), table);
String metadataLocation =
((HasTableOperations) table).operations().current().metadataFileLocation();

// start from an empty cache because the commit above may have already populated it
ManifestFiles.dropCache(table.io());
ContentCache cache = ManifestFiles.contentCache(table.io());
assertThat(cache.estimatedCacheSize()).isEqualTo(0);

// a freshly parsed snapshot reads the manifest list through the content cache
TableMetadata firstMetadata = TableMetadataParser.read(table.io(), metadataLocation);
assertThat(firstMetadata.currentSnapshot().allManifests(table.io())).hasSize(1);
assertThat(cache.estimatedCacheSize()).as("Manifest list should be cached").isEqualTo(1);
assertThat(ManifestFiles.contentCacheStats(table.io()).missCount()).isEqualTo(1);
long hits = ManifestFiles.contentCacheStats(table.io()).hitCount();

// a new snapshot instance reading the same manifest list is served from the cache
TableMetadata secondMetadata = TableMetadataParser.read(table.io(), metadataLocation);
assertThat(secondMetadata.currentSnapshot().allManifests(table.io())).hasSize(1);
assertThat(cache.estimatedCacheSize()).isEqualTo(1);
assertThat(ManifestFiles.contentCacheStats(table.io()).missCount())
.as("Manifest list read should be served from the cache")
.isEqualTo(1);
assertThat(ManifestFiles.contentCacheStats(table.io()).hitCount()).isGreaterThan(hits);

ManifestFiles.dropCache(table.io());
}

@Test
public void testPlanWithSmallCache() throws Exception {
Map<String, String> properties =
Expand Down
Loading