diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/hive/MetadataLocator.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/hive/MetadataLocator.java new file mode 100644 index 000000000000..4dff80477a49 --- /dev/null +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/hive/MetadataLocator.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive; + +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.client.builder.GetTableProjectionsSpecBuilder; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.ClientPool; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.thrift.TException; + +import java.util.Collections; +import java.util.List; + +/** + * Fetches the location of a given metadata table. + *

Since the location mutates with each transaction, this allows determining if a cached version of the + * table is the latest known in the HMS database.

+ */ +public class MetadataLocator { + private static final org.slf4j.Logger LOGGER = org.slf4j.LoggerFactory.getLogger(MetadataLocator.class); + static final GetProjectionsSpec PARAM_SPEC = new GetTableProjectionsSpecBuilder() + .includeParameters() // only fetches table.parameters + .build(); + final HiveCatalog catalog; + + public MetadataLocator(HiveCatalog catalog) { + this.catalog = catalog; + } + + /** + * Returns the location of the metadata table identified by the given identifier, or null if the table does not exist or is not a metadata table. + *

This uses the Thrift API to fetch the table parameters, which is more efficient than fetching the entire table object.

+ * @param identifier the identifier of the metadata table to fetch the location for + * @return the location of the metadata table, or null if the table does not exist or is not a metadata table + */ + public String getLocation(TableIdentifier identifier) { + final ClientPool clients = catalog.clientPool(); + final String catName = catalog.name(); + final TableIdentifier baseTableIdentifier; + if (!catalog.isValidIdentifier(identifier)) { + if (!isValidMetadataIdentifier(identifier)) { + return null; + } else { + baseTableIdentifier = TableIdentifier.of(identifier.namespace().levels()); + } + } else { + baseTableIdentifier = identifier; + } + String database = baseTableIdentifier.namespace().level(0); + String tableName = baseTableIdentifier.name(); + try { + List tables = clients.run( + client -> client.getTables(catName, database, Collections.singletonList(tableName), PARAM_SPEC) + ); + return tables == null || tables.isEmpty() + ? null + : tables.getFirst().getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP); + } catch (NoSuchTableException | NoSuchObjectException e) { + LOGGER.info("Table not found {}", baseTableIdentifier, e); + } catch (TException e) { + LOGGER.info("Table parameters fetch failed {}", baseTableIdentifier, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.error("Interrupted in call to check table existence of {}", baseTableIdentifier, e); + } + return null; + } + + private boolean isValidMetadataIdentifier(TableIdentifier identifier) { + return MetadataTableType.from(identifier.name()) != null && catalog.isValidIdentifier(TableIdentifier.of(identifier.namespace().levels())); + } +} diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java index edb5fbd41a9b..3faffb8a0785 100644 --- a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java @@ -20,11 +20,23 @@ package org.apache.iceberg.rest; import com.github.benmanes.caffeine.cache.Ticker; + +import java.lang.ref.SoftReference; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; + +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseMetadataTable; import org.apache.iceberg.CachingCatalog; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.MetadataTableUtils; import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; @@ -32,59 +44,241 @@ import org.apache.iceberg.catalog.ViewCatalog; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.MetadataLocator; import org.apache.iceberg.view.View; import org.apache.iceberg.view.ViewBuilder; - +import org.jetbrains.annotations.TestOnly; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Class that wraps an Iceberg Catalog to cache tables. */ public class HMSCachingCatalog extends CachingCatalog implements SupportsNamespaces, ViewCatalog { - private final HiveCatalog hiveCatalog; - - public HMSCachingCatalog(HiveCatalog catalog, long expiration) { - super(catalog, true, expiration, Ticker.systemTicker()); + protected static final Logger LOG = LoggerFactory.getLogger(HMSCachingCatalog.class); + + private static SoftReference CACHE = new SoftReference<>(null); + @TestOnly + public static C getLatestCache(Function extractor) { + HMSCachingCatalog cache = CACHE.get(); + if (cache == null) { + return null; + } + return extractor == null ? (C) cache : extractor.apply(cache); + } + + @TestOnly + public HiveCatalog getCatalog() { + return hiveCatalog; + } + + protected final HiveCatalog hiveCatalog; + // Metrics counters + private final AtomicLong cacheHitCount = new AtomicLong(0); + private final AtomicLong cacheMissCount = new AtomicLong(0); + private final AtomicLong cacheLoadCount = new AtomicLong(0); + private final AtomicLong cacheInvalidateCount = new AtomicLong(0); + private final AtomicLong cacheMetaLoadCount = new AtomicLong(0); + + public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) { + this(catalog, expirationMs, /*caseSensitive*/ true, null); + } + + public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean caseSensitive, Configuration conf) { + super(catalog, caseSensitive, expirationMs, Ticker.systemTicker()); this.hiveCatalog = catalog; + if (catalog.getConf().getBoolean("metastore.iceberg.catalog.cache.debug", false)) { + CACHE = new SoftReference<>(this); + } + } + + /** + * Callback when cache invalidates the entry for a given table identifier. + * + * @param tid the table identifier to invalidate + */ + protected void onCacheInvalidate(TableIdentifier tid) { + cacheInvalidateCount.incrementAndGet(); + LOG.debug("Cache invalidate {}: {}", tid, cacheInvalidateCount.get()); + } + + /** + * Callback when cache loads a table for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheLoad(TableIdentifier tid) { + cacheLoadCount.incrementAndGet(); + LOG.debug("Cache load {}: {}", tid, cacheLoadCount.get()); + } + + /** + * Callback when cache hit for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheHit(TableIdentifier tid) { + cacheHitCount.incrementAndGet(); + LOG.debug("Cache hit {} : {}", tid, cacheHitCount.get()); + } + + /** + * Callback when cache miss occurs for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheMiss(TableIdentifier tid) { + cacheMissCount.incrementAndGet(); + LOG.debug("Cache miss {}: {}", tid, cacheMissCount.get()); + } + + /** + * Callback when cache loads a metadata table for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheMetaLoad(TableIdentifier tid) { + cacheMetaLoadCount.incrementAndGet(); + LOG.debug("Cache meta-load {}: {}", tid, cacheMetaLoadCount.get()); + } + + // Getter methods for accessing metrics + public long getCacheHitCount() { + return cacheHitCount.get(); } + public long getCacheMissCount() { + return cacheMissCount.get(); + } + + public long getCacheLoadCount() { + return cacheLoadCount.get(); + } + + public long getCacheInvalidateCount() { + return cacheInvalidateCount.get(); + } + + public long getCacheMetaLoadCount() { + return cacheMetaLoadCount.get(); + } + + public double getCacheHitRate() { + long hits = cacheHitCount.get(); + long total = hits + cacheMissCount.get(); + return total == 0 ? 0.0 : (double) hits / total; + } + + /** + * Generates a map of this cache's performance metrics, including hit count, + * miss count, load count, invalidate count, meta-load count, and hit rate. + * This can be used for monitoring and debugging purposes to understand the effectiveness of the cache. + * @return a map of cache performance metrics + */ + public Map cacheStats() { + return Map.of( + "hit", getCacheHitCount(), + "miss", getCacheMissCount(), + "load", getCacheLoadCount(), + "invalidate", getCacheInvalidateCount(), + "metaload", getCacheMetaLoadCount(), + "hit-rate", getCacheHitRate() + ); + } + + @Override - public Catalog.TableBuilder buildTable(TableIdentifier identifier, Schema schema) { - return hiveCatalog.buildTable(identifier, schema); + public void createNamespace(Namespace namespace, Map map) { + hiveCatalog.createNamespace(namespace, map); } @Override - public void createNamespace(Namespace nmspc, Map map) { - hiveCatalog.createNamespace(nmspc, map); + public List listNamespaces(Namespace namespace) throws NoSuchNamespaceException { + return hiveCatalog.listNamespaces(namespace); } @Override - public List listNamespaces(Namespace nmspc) throws NoSuchNamespaceException { - return hiveCatalog.listNamespaces(nmspc); + public Table loadTable(final TableIdentifier identifier) { + final TableIdentifier canonicalized = identifier.toLowerCase(); + final Table cachedTable = tableCache.getIfPresent(canonicalized); + if (cachedTable != null) { + final String location = new MetadataLocator(hiveCatalog).getLocation(canonicalized); + if (location == null) { + LOG.debug("Table {} has no location, returning cached table without location", canonicalized); + } else { + String cachedLocation = cachedTable instanceof HasTableOperations tableOps + ? tableOps.operations().current().metadataFileLocation() + : null; + if (!location.equals(cachedLocation)) { + LOG.debug("Invalidate table {}, cached {} != actual {}", canonicalized, cachedLocation, location); + // Invalidate the cached table if the location is different + invalidateTable(canonicalized); + onCacheInvalidate(canonicalized); + } else { + onCacheHit(canonicalized); + return cachedTable; + } + } + } else { + onCacheMiss(canonicalized); + } + final Table table = tableCache.get(canonicalized, this::loadTableWithoutCache); + if (table instanceof BaseMetadataTable) { + // Cache underlying table + TableIdentifier originTableIdentifier = + TableIdentifier.of(canonicalized.namespace().levels()); + Table originTable = tableCache.get(originTableIdentifier, this::loadTableWithoutCache); + // Share TableOperations instance of origin table for all metadata tables, so that metadata + // table instances are refreshed as well when origin table instance is refreshed. + if (originTable instanceof HasTableOperations tableOps) { + TableOperations ops = tableOps.operations(); + MetadataTableType type = MetadataTableType.from(canonicalized.name()); + Table metadataTable = + MetadataTableUtils.createMetadataTableInstance( + ops, hiveCatalog.name(), originTableIdentifier, canonicalized, type); + tableCache.put(canonicalized, metadataTable); + onCacheMetaLoad(canonicalized); + LOG.debug("Loaded metadata table: {} for origin table: {}", canonicalized, originTableIdentifier); + // Return the metadata table instead of the original table + return metadataTable; + } + } + onCacheLoad(canonicalized); + return table; + } + + private Table loadTableWithoutCache(TableIdentifier identifier) { + try { + return hiveCatalog.loadTable(identifier); + } catch (NoSuchTableException exception) { + return null; + } } @Override - public Map loadNamespaceMetadata(Namespace nmspc) throws NoSuchNamespaceException { - return hiveCatalog.loadNamespaceMetadata(nmspc); + public Map loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException { + return hiveCatalog.loadNamespaceMetadata(namespace); } @Override - public boolean dropNamespace(Namespace nmspc) throws NamespaceNotEmptyException { - List tables = listTables(nmspc); + public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException { + List tables = listTables(namespace); for (TableIdentifier ident : tables) { invalidateTable(ident); } - return hiveCatalog.dropNamespace(nmspc); + return hiveCatalog.dropNamespace(namespace); } @Override - public boolean setProperties(Namespace nmspc, Map map) throws NoSuchNamespaceException { - return hiveCatalog.setProperties(nmspc, map); + public boolean setProperties(Namespace namespace, Map map) throws NoSuchNamespaceException { + return hiveCatalog.setProperties(namespace, map); } @Override - public boolean removeProperties(Namespace nmspc, Set set) throws NoSuchNamespaceException { - return hiveCatalog.removeProperties(nmspc, set); + public boolean removeProperties(Namespace namespace, Set set) throws NoSuchNamespaceException { + return hiveCatalog.removeProperties(namespace, set); } @Override @@ -92,6 +286,11 @@ public boolean namespaceExists(Namespace namespace) { return hiveCatalog.namespaceExists(namespace); } + @Override + public Catalog.TableBuilder buildTable(TableIdentifier identifier, Schema schema) { + return hiveCatalog.buildTable(identifier, schema); + } + @Override public List listViews(Namespace namespace) { return hiveCatalog.listViews(namespace); diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogAdapter.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogAdapter.java index 73d23ae5daf0..5d8ac7584034 100644 --- a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogAdapter.java +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogAdapter.java @@ -21,6 +21,7 @@ import com.google.common.base.Preconditions; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.function.Consumer; @@ -69,6 +70,7 @@ import org.apache.iceberg.rest.responses.ListTablesResponse; import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.rest.responses.LoadViewResponse; +import org.apache.iceberg.rest.responses.HMSCacheStatsResponse; import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; @@ -78,6 +80,7 @@ * Adaptor class to translate REST requests into {@link Catalog} API calls. */ public class HMSCatalogAdapter implements RESTClient { + private static final String V1_CACHE_STATS = "v1/cache/stats"; private static final Splitter SLASH = Splitter.on('/'); private static final Map, Integer> EXCEPTION_ERROR_CODES = @@ -136,7 +139,8 @@ enum Route { CREATE_VIEW(HTTPMethod.POST, ResourcePaths.V1_VIEWS, CreateViewRequest.class), UPDATE_VIEW(HTTPMethod.POST, ResourcePaths.V1_VIEW, UpdateTableRequest.class), RENAME_VIEW(HTTPMethod.POST, ResourcePaths.V1_VIEW_RENAME, RenameTableRequest.class), - DROP_VIEW(HTTPMethod.DELETE, ResourcePaths.V1_VIEW); + DROP_VIEW(HTTPMethod.DELETE, ResourcePaths.V1_VIEW), + CACHE_STATS(HTTPMethod.GET, V1_CACHE_STATS); private final HTTPMethod method; private final int requiredLength; @@ -210,6 +214,14 @@ public Class requestClass() { } } + private HMSCacheStatsResponse cacheStats() { + Map stats = Collections.emptyMap(); + if (catalog instanceof HMSCachingCatalog hmsCatalog) { + stats = hmsCatalog.cacheStats(); + } + return castResponse(HMSCacheStatsResponse.class, new HMSCacheStatsResponse(stats)); + } + private ConfigResponse config() { final List endpoints = Arrays.stream(Route.values()) .map(r -> Endpoint.create(r.method.name(), r.resourcePath)).toList(); @@ -409,82 +421,34 @@ private static void commitTransaction(Catalog catalog, CommitTransactionRequest @SuppressWarnings({"MethodLength", "unchecked"}) private T handleRequest( Route route, Map vars, Object body) { - switch (route) { - case CONFIG: - return (T) config(); - - case LIST_NAMESPACES: - return (T) listNamespaces(vars); - - case CREATE_NAMESPACE: - return (T) createNamespace(body); - - case NAMESPACE_EXISTS: - return (T) namespaceExists(vars); - - case LOAD_NAMESPACE: - return (T) loadNamespace(vars); - - case DROP_NAMESPACE: - return (T) dropNamespace(vars); - - case UPDATE_NAMESPACE: - return (T) updateNamespace(vars, body); - - case LIST_TABLES: - return (T) listTables(vars); - - case CREATE_TABLE: - return (T) createTable(vars, body); - - case DROP_TABLE: - return (T) dropTable(vars); - - case TABLE_EXISTS: - return (T) tableExists(vars); - - case LOAD_TABLE: - return (T) loadTable(vars); - - case REGISTER_TABLE: - return (T) registerTable(vars, body); - - case UPDATE_TABLE: - return (T) updateTable(vars, body); - - case RENAME_TABLE: - return (T) renameTable(body); - - case REPORT_METRICS: - return (T) reportMetrics(body); - - case COMMIT_TRANSACTION: - return (T) commitTransaction(body); - - case LIST_VIEWS: - return (T) listViews(vars); - - case CREATE_VIEW: - return (T) createView(vars, body); - - case VIEW_EXISTS: - return (T) viewExists(vars); - - case LOAD_VIEW: - return (T) loadView(vars); - - case UPDATE_VIEW: - return (T) updateView(vars, body); - - case RENAME_VIEW: - return (T) renameView(body); - - case DROP_VIEW: - return (T) dropView(vars); - - default: - } - return null; + return switch (route) { + case CONFIG -> (T) config(); + case LIST_NAMESPACES -> (T) listNamespaces(vars); + case CREATE_NAMESPACE -> (T) createNamespace(body); + case NAMESPACE_EXISTS -> (T) namespaceExists(vars); + case LOAD_NAMESPACE -> (T) loadNamespace(vars); + case DROP_NAMESPACE -> (T) dropNamespace(vars); + case UPDATE_NAMESPACE -> (T) updateNamespace(vars, body); + case LIST_TABLES -> (T) listTables(vars); + case CREATE_TABLE -> (T) createTable(vars, body); + case DROP_TABLE -> (T) dropTable(vars); + case TABLE_EXISTS -> (T) tableExists(vars); + case LOAD_TABLE -> (T) loadTable(vars); + case REGISTER_TABLE -> (T) registerTable(vars, body); + case UPDATE_TABLE -> (T) updateTable(vars, body); + case RENAME_TABLE -> (T) renameTable(body); + case REPORT_METRICS -> (T) reportMetrics(body); + case COMMIT_TRANSACTION -> (T) commitTransaction(body); + case LIST_VIEWS -> (T) listViews(vars); + case CREATE_VIEW -> (T) createView(vars, body); + case VIEW_EXISTS -> (T) viewExists(vars); + case LOAD_VIEW -> (T) loadView(vars); + case UPDATE_VIEW -> (T) updateView(vars, body); + case RENAME_VIEW -> (T) renameView(body); + case DROP_VIEW -> (T) dropView(vars); + case CACHE_STATS -> (T) cacheStats(); + default -> null; + }; } @@ -607,11 +571,15 @@ public static T castResponse(Class responseType, Obj public static void configureResponseFromException( Exception exc, ErrorResponse.Builder errorBuilder) { + int errorCode = EXCEPTION_ERROR_CODES.getOrDefault(exc.getClass(), 500); errorBuilder - .responseCode(EXCEPTION_ERROR_CODES.getOrDefault(exc.getClass(), 500)) + .responseCode(errorCode) .withType(exc.getClass().getSimpleName()) - .withMessage(exc.getMessage()) - .withStackTrace(exc); + .withMessage(exc.getMessage()); + // avoid exposing stack traces for client errors, but include them for server errors to aid debugging + if (errorCode == 500) { + errorBuilder.withStackTrace(exc); + } } private static Namespace namespaceFromPathVars(Map pathVars) { diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogServlet.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogServlet.java index 6140f40b2de5..4e4e6918dbfe 100644 --- a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogServlet.java +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogServlet.java @@ -31,6 +31,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.rest.HMSCatalogAdapter.Route; import org.apache.iceberg.rest.HTTPRequest.HTTPMethod; +import org.apache.iceberg.exceptions.RESTException; import org.apache.iceberg.rest.responses.ErrorResponse; import org.apache.iceberg.util.Pair; import org.slf4j.Logger; @@ -82,8 +83,13 @@ protected void service(HttpServletRequest request, HttpServletResponse response) if (responseBody != null) { RESTObjectMapper.mapper().writeValue(response.getWriter(), responseBody); } + } catch (RESTException e) { + // A RESTException is thrown by HMSCatalogAdapter.execute() after the error handler has + // already written the correct HTTP status and body to the response (e.g. 404, 403). + // It is not an unexpected server failure, so log at DEBUG to avoid flooding the console. + LOG.debug("REST request resulted in a client error (already handled): {}", e.getMessage()); } catch (RuntimeException | IOException e) { - // should be a RESTException but not able to see them through dependencies + // Genuine unexpected server error – log the full stack trace. LOG.error("Error processing REST request", e); response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); } diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/responses/HMSCacheStatsResponse.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/responses/HMSCacheStatsResponse.java new file mode 100644 index 000000000000..1424010fd693 --- /dev/null +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/responses/HMSCacheStatsResponse.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +import org.apache.iceberg.rest.RESTResponse; + +import java.util.Collections; +import java.util.Map; +import java.util.TreeMap; + +public record HMSCacheStatsResponse(Map stats) implements RESTResponse { + public HMSCacheStatsResponse(Map stats) { + this.stats = Collections.unmodifiableMap(new TreeMap<>(stats)); + } + + @Override + public void validate() { + // nothing + } +} diff --git a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestHMSCachingCatalogStats.java b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestHMSCachingCatalogStats.java new file mode 100644 index 000000000000..f1b56e0e68e0 --- /dev/null +++ b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestHMSCachingCatalogStats.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.rest; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import org.apache.hadoop.hive.metastore.ServletSecurity.AuthType; +import org.apache.hadoop.hive.metastore.annotation.MetastoreCheckinTest; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.rest.extension.HiveRESTCatalogServerExtension; +import org.apache.iceberg.rest.responses.HMSCacheStatsResponse; +import org.junit.experimental.categories.Category; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.extension.RegisterExtension; + +/** + * Integration tests that verify the {@link HMSCachingCatalog} cache-statistics counters + * (hit, miss, load, hit-rate) are updated correctly and exposed accurately via the + * {@code GET v1/cache/stats} REST endpoint. + * + *

The server is started with {@link AuthType#NONE} so the tests focus purely on + * caching behaviour without any authentication noise. + */ +@Category(MetastoreCheckinTest.class) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class TestHMSCachingCatalogStats { + + /** 5 minutes expressed in milliseconds – the value injected into {@code ICEBERG_CATALOG_CACHE_EXPIRY}. */ + private static final long CACHE_EXPIRY_MS = 5 * 60 * 1_000L; + + @RegisterExtension + private static final HiveRESTCatalogServerExtension REST_CATALOG_EXTENSION = + HiveRESTCatalogServerExtension.builder(AuthType.NONE) + // Without a positive expiry the HMSCatalogFactory skips HMSCachingCatalog entirely. + .configure( + MetastoreConf.ConfVars.ICEBERG_CATALOG_CACHE_EXPIRY.getVarname(), + String.valueOf(CACHE_EXPIRY_MS)) + .configure("metastore.iceberg.catalog.cache.debug", "true") + .build(); + + private RESTCatalog catalog; + private HiveCatalog serverCatalog; + + @BeforeAll + void setupAll() { + catalog = RCKUtils.initCatalogClient(clientConfig()); + serverCatalog = HMSCachingCatalog.getLatestCache(HMSCachingCatalog::getCatalog); + } + + /** Remove any namespace/table created by the test so each run starts clean. */ + @AfterEach + void cleanup() { + RCKUtils.purgeCatalogTestEntries(catalog); + } + + // --------------------------------------------------------------------------- + // helpers + // --------------------------------------------------------------------------- + + private java.util.Map clientConfig() { + return java.util.Map.of("uri", REST_CATALOG_EXTENSION.getRestEndpoint()); + } + + /** + * Calls the {@code GET v1/cache/stats} endpoint directly over HTTP and returns + * the deserialised {@link HMSCacheStatsResponse}. + */ + private HMSCacheStatsResponse fetchCacheStats() throws Exception { + String statsUrl = REST_CATALOG_EXTENSION.getRestEndpoint() + "/v1/cache/stats"; + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create(statsUrl)) + .GET() + .build(); + HttpResponse response; + try (HttpClient client = HttpClient.newHttpClient()) { + response = client.send(request, HttpResponse.BodyHandlers.ofString()); + } + Assertions.assertEquals(200, response.statusCode(), + "Expected HTTP 200 from cache stats endpoint, got: " + response.statusCode()); + return new ObjectMapper().readValue(response.body(), HMSCacheStatsResponse.class); + } + + + /** + * Verifies that the {@link HMSCachingCatalog} correctly tracks cache hits, misses and + * loads, and that those counters are accurately returned via the REST endpoint. + * + *

Strategy: + *

    + *
  1. Snapshot baseline stats before any operations so the test is isolated from + * cumulative counters left by previous tests.
  2. + *
  3. Create a namespace and a table (bypasses the cache – done via + * {@link org.apache.iceberg.hive.HiveCatalog} directly).
  4. + *
  5. First {@code loadTable} call → cache miss + actual load.
  6. + *
  7. Second and third {@code loadTable} calls → cache hits (metadata location + * has not changed, so the cached entry is still valid).
  8. + *
  9. Fetch stats again and assert the deltas against the baseline.
  10. + *
+ */ + @Test + void testCacheCountersAreUpdated() throws Exception { + // -- baseline --------------------------------------------------------------- + HMSCacheStatsResponse baseline = fetchCacheStats(); + long baseHit = baseline.stats().getOrDefault("hit", 0L).longValue(); + long baseMiss = baseline.stats().getOrDefault("miss", 0L).longValue(); + long baseLoad = baseline.stats().getOrDefault("load", 0L).longValue(); + + // -- exercise the cache ----------------------------------------------------- + var db = Namespace.of("caching_stats_test_db"); + var tableId = TableIdentifier.of(db, "caching_stats_test_table"); + + catalog.createNamespace(db); + catalog.createTable(tableId, new Schema()); + + // First load → cache miss + load + catalog.loadTable(tableId); + // Second load → cache hit (metadata location unchanged) + catalog.loadTable(tableId); + // Third load → cache hit + catalog.loadTable(tableId); + + // Mutate the table by appending a data file – this creates a new snapshot + // which advances METADATA_LOCATION in HMS, so the next loadTable call through + // the caching catalog will detect the stale cached location and invalidate it. + Table table = serverCatalog.loadTable(tableId); + DataFile dataFile = DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath(table.location() + "/data/fake-0.parquet") + .withFileSizeInBytes(1024) + .withRecordCount(1) + .build(); + table.newAppend() + .appendFile(dataFile) + .commit(); + + long baseInvalidate = fetchCacheStats().stats().getOrDefault("invalidate", 0L).longValue(); + + // Fourth load → cache invalidation + load (cached location != HMS location) + catalog.loadTable(tableId); + + // -- fetch updated stats via the REST endpoint ------------------------------ + HMSCacheStatsResponse after = fetchCacheStats(); + long deltaHit = after.stats().getOrDefault("hit", 0L).longValue() - baseHit; + long deltaMiss = after.stats().getOrDefault("miss", 0L).longValue() - baseMiss; + long deltaLoad = after.stats().getOrDefault("load", 0L).longValue() - baseLoad; + long deltaInvalidate = after.stats().getOrDefault("invalidate", 0L).longValue() - baseInvalidate; + + // -- assertions ------------------------------------------------------------- + Assertions.assertTrue(deltaMiss >= 1, + "Expected at least 1 cache miss (first loadTable), but delta was: " + deltaMiss); + Assertions.assertTrue(deltaLoad >= 2, + "Expected at least 2 cache loads (initial load + post-invalidation reload), but delta was: " + deltaLoad); + Assertions.assertTrue(deltaHit >= 2, + "Expected at least 2 cache hits (second + third loadTable), but delta was: " + deltaHit); + Assertions.assertTrue(deltaInvalidate >= 1, + "Expected at least 1 cache invalidation (metadata location changed after table update), but delta was: " + deltaInvalidate); + + // hit-rate must be a valid ratio in [0.0, 1.0] + double hitRate = after.stats().getOrDefault("hit-rate", 0.0).doubleValue(); + Assertions.assertTrue(hitRate >= 0.0 && hitRate <= 1.0, + "hit-rate must be in [0.0, 1.0] but was: " + hitRate); + } +} +