Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0

Check warning on line 10 in standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/hive/MetadataLocator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line does not match expected header line of ' * http://www.apache.org/licenses/LICENSE-2.0'.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ2c3HhvLdirLWE7GItv&open=AZ2c3HhvLdirLWE7GItv&pullRequest=6441
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.hive;

import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.client.builder.GetTableProjectionsSpecBuilder;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.ClientPool;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.thrift.TException;

import java.util.Collections;
import java.util.List;

/**
* Fetches the location of a given metadata table.
* <p>Since the location mutates with each transaction, this allows determining if a cached version of the
* table is the latest known in the HMS database.</p>
*/
public class MetadataLocator {
private static final org.slf4j.Logger LOGGER = org.slf4j.LoggerFactory.getLogger(MetadataLocator.class);
static final GetProjectionsSpec PARAM_SPEC = new GetTableProjectionsSpecBuilder()
.includeParameters() // only fetches table.parameters
.build();
final HiveCatalog catalog;

Check warning on line 47 in standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/hive/MetadataLocator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Variable 'catalog' must be private and have accessor methods.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ2c3HhvLdirLWE7GItw&open=AZ2c3HhvLdirLWE7GItw&pullRequest=6441

public MetadataLocator(HiveCatalog catalog) {
this.catalog = catalog;
}

/**
* Returns the location of the metadata table identified by the given identifier, or null if the table does not exist or is not a metadata table.

Check warning on line 54 in standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/hive/MetadataLocator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line is longer than 120 characters (found 147).

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ2c3HhvLdirLWE7GItx&open=AZ2c3HhvLdirLWE7GItx&pullRequest=6441
* <p>This uses the Thrift API to fetch the table parameters, which is more efficient than fetching the entire table object.</p>

Check warning on line 55 in standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/hive/MetadataLocator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line is longer than 120 characters (found 130).

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ2c3HhvLdirLWE7GIty&open=AZ2c3HhvLdirLWE7GIty&pullRequest=6441
* @param identifier the identifier of the metadata table to fetch the location for
* @return the location of the metadata table, or null if the table does not exist or is not a metadata table
*/
public String getLocation(TableIdentifier identifier) {
final ClientPool<IMetaStoreClient, TException> clients = catalog.clientPool();

Check failure on line 60 in standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/hive/MetadataLocator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this usage of "clientPool", it is annotated with @VisibleForTesting and should not be accessed from production code.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ2c3HhvLdirLWE7GItu&open=AZ2c3HhvLdirLWE7GItu&pullRequest=6441
final String catName = catalog.name();
final TableIdentifier baseTableIdentifier;
if (!catalog.isValidIdentifier(identifier)) {
if (!isValidMetadataIdentifier(identifier)) {
return null;
} else {
baseTableIdentifier = TableIdentifier.of(identifier.namespace().levels());
}
} else {
baseTableIdentifier = identifier;
}
String database = baseTableIdentifier.namespace().level(0);
String tableName = baseTableIdentifier.name();
try {
List<Table> tables = clients.run(
client -> client.getTables(catName, database, Collections.singletonList(tableName), PARAM_SPEC)
);
return tables == null || tables.isEmpty()
? null
: tables.getFirst().getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
} catch (NoSuchTableException | NoSuchObjectException e) {
LOGGER.info("Table not found {}", baseTableIdentifier, e);
} catch (TException e) {
LOGGER.info("Table parameters fetch failed {}", baseTableIdentifier, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.error("Interrupted in call to check table existence of {}", baseTableIdentifier, e);
}
return null;
}

private boolean isValidMetadataIdentifier(TableIdentifier identifier) {
return MetadataTableType.from(identifier.name()) != null && catalog.isValidIdentifier(TableIdentifier.of(identifier.namespace().levels()));

Check warning on line 93 in standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/hive/MetadataLocator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line is longer than 120 characters (found 143).

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ2c3HhvLdirLWE7GItz&open=AZ2c3HhvLdirLWE7GItz&pullRequest=6441
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,78 +20,277 @@
package org.apache.iceberg.rest;

import com.github.benmanes.caffeine.cache.Ticker;

import java.lang.ref.SoftReference;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.BaseMetadataTable;
import org.apache.iceberg.CachingCatalog;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.ViewCatalog;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.hive.MetadataLocator;
import org.apache.iceberg.view.View;
import org.apache.iceberg.view.ViewBuilder;

import org.jetbrains.annotations.TestOnly;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Class that wraps an Iceberg Catalog to cache tables.
*/
public class HMSCachingCatalog extends CachingCatalog implements SupportsNamespaces, ViewCatalog {
private final HiveCatalog hiveCatalog;

public HMSCachingCatalog(HiveCatalog catalog, long expiration) {
super(catalog, true, expiration, Ticker.systemTicker());
protected static final Logger LOG = LoggerFactory.getLogger(HMSCachingCatalog.class);

private static SoftReference<HMSCachingCatalog> CACHE = new SoftReference<>(null);

Check warning on line 62 in standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this field "CACHE" to match the regular expression '^[a-z][a-zA-Z0-9]*$'.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ2d-dcH1W-EvMDtd-Ke&open=AZ2d-dcH1W-EvMDtd-Ke&pullRequest=6441

Check warning on line 62 in standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Name 'CACHE' must match pattern '^[a-z][a-zA-Z0-9]*$'.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ2d-dcH1W-EvMDtd-Kg&open=AZ2d-dcH1W-EvMDtd-Kg&pullRequest=6441
@TestOnly
public static <C extends Catalog> C getLatestCache(Function<HMSCachingCatalog, C> extractor) {
HMSCachingCatalog cache = CACHE.get();
if (cache == null) {
return null;
}
return extractor == null ? (C) cache : extractor.apply(cache);
}

@TestOnly
public HiveCatalog getCatalog() {
return hiveCatalog;
}

protected final HiveCatalog hiveCatalog;

Check warning on line 77 in standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Variable 'hiveCatalog' must be private and have accessor methods.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ2c3HmpLdirLWE7GIt2&open=AZ2c3HmpLdirLWE7GIt2&pullRequest=6441
// Metrics counters
private final AtomicLong cacheHitCount = new AtomicLong(0);
private final AtomicLong cacheMissCount = new AtomicLong(0);
private final AtomicLong cacheLoadCount = new AtomicLong(0);
private final AtomicLong cacheInvalidateCount = new AtomicLong(0);
private final AtomicLong cacheMetaLoadCount = new AtomicLong(0);

public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) {
this(catalog, expirationMs, /*caseSensitive*/ true, null);
}

public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean caseSensitive, Configuration conf) {

Check warning on line 89 in standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this unused method parameter "conf".

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ2c3HmpLdirLWE7GIt1&open=AZ2c3HmpLdirLWE7GIt1&pullRequest=6441
super(catalog, caseSensitive, expirationMs, Ticker.systemTicker());
this.hiveCatalog = catalog;
if (catalog.getConf().getBoolean("metastore.iceberg.catalog.cache.debug", false)) {
CACHE = new SoftReference<>(this);

Check warning on line 93 in standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this assignment of "CACHE".

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ2d-dcH1W-EvMDtd-Kf&open=AZ2d-dcH1W-EvMDtd-Kf&pullRequest=6441
}
}

/**
* Callback when cache invalidates the entry for a given table identifier.
*
* @param tid the table identifier to invalidate
*/
protected void onCacheInvalidate(TableIdentifier tid) {
cacheInvalidateCount.incrementAndGet();
LOG.debug("Cache invalidate {}: {}", tid, cacheInvalidateCount.get());
}

/**
* Callback when cache loads a table for a given table identifier.
*
* @param tid the table identifier
*/
protected void onCacheLoad(TableIdentifier tid) {
cacheLoadCount.incrementAndGet();
LOG.debug("Cache load {}: {}", tid, cacheLoadCount.get());
}

/**
* Callback when cache hit for a given table identifier.
*
* @param tid the table identifier
*/
protected void onCacheHit(TableIdentifier tid) {
cacheHitCount.incrementAndGet();
LOG.debug("Cache hit {} : {}", tid, cacheHitCount.get());
}

/**
* Callback when cache miss occurs for a given table identifier.
*
* @param tid the table identifier
*/
protected void onCacheMiss(TableIdentifier tid) {
cacheMissCount.incrementAndGet();
LOG.debug("Cache miss {}: {}", tid, cacheMissCount.get());
}

/**
* Callback when cache loads a metadata table for a given table identifier.
*
* @param tid the table identifier
*/
protected void onCacheMetaLoad(TableIdentifier tid) {
cacheMetaLoadCount.incrementAndGet();
LOG.debug("Cache meta-load {}: {}", tid, cacheMetaLoadCount.get());
}

// Getter methods for accessing metrics
public long getCacheHitCount() {
return cacheHitCount.get();
}

public long getCacheMissCount() {
return cacheMissCount.get();
}

public long getCacheLoadCount() {
return cacheLoadCount.get();
}

public long getCacheInvalidateCount() {
return cacheInvalidateCount.get();
}

public long getCacheMetaLoadCount() {
return cacheMetaLoadCount.get();
}

public double getCacheHitRate() {
long hits = cacheHitCount.get();
long total = hits + cacheMissCount.get();
return total == 0 ? 0.0 : (double) hits / total;
}

/**
* Generates a map of this cache's performance metrics, including hit count,
* miss count, load count, invalidate count, meta-load count, and hit rate.
* This can be used for monitoring and debugging purposes to understand the effectiveness of the cache.
* @return a map of cache performance metrics
*/
public Map<String, Number> cacheStats() {
return Map.of(
"hit", getCacheHitCount(),
"miss", getCacheMissCount(),
"load", getCacheLoadCount(),
"invalidate", getCacheInvalidateCount(),
"metaload", getCacheMetaLoadCount(),
"hit-rate", getCacheHitRate()
);
}


@Override
public Catalog.TableBuilder buildTable(TableIdentifier identifier, Schema schema) {
return hiveCatalog.buildTable(identifier, schema);
public void createNamespace(Namespace namespace, Map<String, String> map) {
hiveCatalog.createNamespace(namespace, map);
}

@Override
public void createNamespace(Namespace nmspc, Map<String, String> map) {
hiveCatalog.createNamespace(nmspc, map);
public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException {
return hiveCatalog.listNamespaces(namespace);
}

@Override
public List<Namespace> listNamespaces(Namespace nmspc) throws NoSuchNamespaceException {
return hiveCatalog.listNamespaces(nmspc);
public Table loadTable(final TableIdentifier identifier) {
final TableIdentifier canonicalized = identifier.toLowerCase();
final Table cachedTable = tableCache.getIfPresent(canonicalized);
if (cachedTable != null) {
final String location = new MetadataLocator(hiveCatalog).getLocation(canonicalized);
if (location == null) {
LOG.debug("Table {} has no location, returning cached table without location", canonicalized);
} else {
String cachedLocation = cachedTable instanceof HasTableOperations tableOps
? tableOps.operations().current().metadataFileLocation()
: null;
if (!location.equals(cachedLocation)) {
LOG.debug("Invalidate table {}, cached {} != actual {}", canonicalized, cachedLocation, location);
// Invalidate the cached table if the location is different
invalidateTable(canonicalized);
onCacheInvalidate(canonicalized);
} else {
onCacheHit(canonicalized);
return cachedTable;
}
}
} else {
onCacheMiss(canonicalized);
}
final Table table = tableCache.get(canonicalized, this::loadTableWithoutCache);
if (table instanceof BaseMetadataTable) {
// Cache underlying table
TableIdentifier originTableIdentifier =
TableIdentifier.of(canonicalized.namespace().levels());
Table originTable = tableCache.get(originTableIdentifier, this::loadTableWithoutCache);
// Share TableOperations instance of origin table for all metadata tables, so that metadata
// table instances are refreshed as well when origin table instance is refreshed.
if (originTable instanceof HasTableOperations tableOps) {
TableOperations ops = tableOps.operations();
MetadataTableType type = MetadataTableType.from(canonicalized.name());
Table metadataTable =
MetadataTableUtils.createMetadataTableInstance(
ops, hiveCatalog.name(), originTableIdentifier, canonicalized, type);
tableCache.put(canonicalized, metadataTable);
onCacheMetaLoad(canonicalized);
LOG.debug("Loaded metadata table: {} for origin table: {}", canonicalized, originTableIdentifier);
// Return the metadata table instead of the original table
return metadataTable;
}
}
onCacheLoad(canonicalized);
return table;
}

private Table loadTableWithoutCache(TableIdentifier identifier) {
try {
return hiveCatalog.loadTable(identifier);
} catch (NoSuchTableException exception) {
return null;
}
}

@Override
public Map<String, String> loadNamespaceMetadata(Namespace nmspc) throws NoSuchNamespaceException {
return hiveCatalog.loadNamespaceMetadata(nmspc);
public Map<String, String> loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException {
return hiveCatalog.loadNamespaceMetadata(namespace);
}

@Override
public boolean dropNamespace(Namespace nmspc) throws NamespaceNotEmptyException {
List<TableIdentifier> tables = listTables(nmspc);
public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException {
List<TableIdentifier> tables = listTables(namespace);
for (TableIdentifier ident : tables) {
invalidateTable(ident);
}
return hiveCatalog.dropNamespace(nmspc);
return hiveCatalog.dropNamespace(namespace);
}

@Override
public boolean setProperties(Namespace nmspc, Map<String, String> map) throws NoSuchNamespaceException {
return hiveCatalog.setProperties(nmspc, map);
public boolean setProperties(Namespace namespace, Map<String, String> map) throws NoSuchNamespaceException {
return hiveCatalog.setProperties(namespace, map);
}

@Override
public boolean removeProperties(Namespace nmspc, Set<String> set) throws NoSuchNamespaceException {
return hiveCatalog.removeProperties(nmspc, set);
public boolean removeProperties(Namespace namespace, Set<String> set) throws NoSuchNamespaceException {
return hiveCatalog.removeProperties(namespace, set);
}

@Override
public boolean namespaceExists(Namespace namespace) {
return hiveCatalog.namespaceExists(namespace);
}

@Override
public Catalog.TableBuilder buildTable(TableIdentifier identifier, Schema schema) {
return hiveCatalog.buildTable(identifier, schema);
}

@Override
public List<TableIdentifier> listViews(Namespace namespace) {
return hiveCatalog.listViews(namespace);
Expand Down
Loading
Loading