Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,54 @@ public class MetricNames {
public static final String KV_PRE_WRITE_BUFFER_TRUNCATE_AS_ERROR_RATE =
"preWriteBufferTruncateAsErrorPerSecond";

// --------------------------------------------------------------------------------------------
// RocksDB metrics
// --------------------------------------------------------------------------------------------
// Table-level RocksDB metrics (aggregated from all buckets of a table, Max aggregation)
/** Maximum write stall duration across all buckets of this table (Max aggregation). */
public static final String ROCKSDB_WRITE_STALL_MICROS_MAX = "rocksdbWriteStallMicrosMax";

/** Maximum get latency across all buckets of this table (Max aggregation). */
public static final String ROCKSDB_GET_LATENCY_MICROS_MAX = "rocksdbGetLatencyMicrosMax";

/** Maximum write latency across all buckets of this table (Max aggregation). */
public static final String ROCKSDB_WRITE_LATENCY_MICROS_MAX = "rocksdbWriteLatencyMicrosMax";

/** Maximum number of L0 files across all buckets of this table (Max aggregation). */
public static final String ROCKSDB_NUM_FILES_AT_LEVEL0_MAX = "rocksdbNumFilesAtLevel0Max";

/** Maximum flush pending indicator across all buckets of this table (Max aggregation). */
public static final String ROCKSDB_FLUSH_PENDING_MAX = "rocksdbFlushPendingMax";

/** Maximum compaction pending indicator across all buckets of this table (Max aggregation). */
public static final String ROCKSDB_COMPACTION_PENDING_MAX = "rocksdbCompactionPendingMax";

/** Maximum compaction time across all buckets of this table (Max aggregation). */
public static final String ROCKSDB_COMPACTION_TIME_MICROS_MAX =
"rocksdbCompactionTimeMicrosMax";

// Table-level RocksDB metrics (aggregated from all buckets of a table, Sum aggregation)
/** Total bytes read across all buckets of this table (Sum aggregation). */
public static final String ROCKSDB_BYTES_READ_TOTAL = "rocksdbBytesReadTotal";

/** Total bytes written across all buckets of this table (Sum aggregation). */
public static final String ROCKSDB_BYTES_WRITTEN_TOTAL = "rocksdbBytesWrittenTotal";

/** Total flush bytes written across all buckets of this table (Sum aggregation). */
public static final String ROCKSDB_FLUSH_BYTES_WRITTEN_TOTAL = "rocksdbFlushBytesWrittenTotal";

/** Total compaction bytes read across all buckets of this table (Sum aggregation). */
public static final String ROCKSDB_COMPACTION_BYTES_READ_TOTAL =
"rocksdbCompactionBytesReadTotal";

/** Total compaction bytes written across all buckets of this table (Sum aggregation). */
public static final String ROCKSDB_COMPACTION_BYTES_WRITTEN_TOTAL =
"rocksdbCompactionBytesWrittenTotal";

// Server-level RocksDB metrics (aggregated from all tables, Sum aggregation)
/** Total memory usage across all RocksDB instances in this server (Sum aggregation). */
public static final String ROCKSDB_MEMORY_USAGE_TOTAL = "rocksdbMemoryUsageTotal";

// --------------------------------------------------------------------------------------------
// metrics for table bucket
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti
currentKvs.get(tableBucket).getKvTabletDir().getAbsolutePath()));
}
this.currentKvs.put(tableBucket, kvTablet);

return kvTablet;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.fluss.server.kv.rocksdb.RocksDBKv;
import org.apache.fluss.server.kv.rocksdb.RocksDBKvBuilder;
import org.apache.fluss.server.kv.rocksdb.RocksDBResourceContainer;
import org.apache.fluss.server.kv.rocksdb.RocksDBStatistics;
import org.apache.fluss.server.kv.rowmerger.DefaultRowMerger;
import org.apache.fluss.server.kv.rowmerger.RowMerger;
import org.apache.fluss.server.kv.snapshot.KvFileHandleAndLocalPath;
Expand Down Expand Up @@ -118,6 +119,9 @@ public final class KvTablet {
// the changelog image mode for this tablet
private final ChangelogImage changelogImage;

// RocksDB statistics accessor for this tablet
@Nullable private final RocksDBStatistics rocksDBStatistics;

/**
* The kv data in pre-write buffer whose log offset is less than the flushedLogOffset has been
* flushed into kv.
Expand All @@ -142,7 +146,8 @@ private KvTablet(
RowMerger rowMerger,
ArrowCompressionInfo arrowCompressionInfo,
SchemaGetter schemaGetter,
ChangelogImage changelogImage) {
ChangelogImage changelogImage,
@Nullable RocksDBStatistics rocksDBStatistics) {
this.physicalPath = physicalPath;
this.tableBucket = tableBucket;
this.logTablet = logTablet;
Expand All @@ -158,6 +163,7 @@ private KvTablet(
this.arrowCompressionInfo = arrowCompressionInfo;
this.schemaGetter = schemaGetter;
this.changelogImage = changelogImage;
this.rocksDBStatistics = rocksDBStatistics;
}

public static KvTablet create(
Expand All @@ -177,6 +183,19 @@ public static KvTablet create(
RateLimiter sharedRateLimiter)
throws IOException {
RocksDBKv kv = buildRocksDBKv(serverConf, kvTabletDir, sharedRateLimiter);

// Create RocksDB statistics accessor (will be registered to TableMetricGroup by Replica)
// Pass ResourceGuard to ensure thread-safe access during concurrent close operations
// Pass ColumnFamilyHandle for column family specific properties like num-files-at-level0
// Pass Cache for accurate block cache memory tracking
RocksDBStatistics rocksDBStatistics =
new RocksDBStatistics(
kv.getDb(),
kv.getStatistics(),
kv.getResourceGuard(),
kv.getDefaultColumnFamilyHandle(),
kv.getBlockCache());

return new KvTablet(
tablePath,
tableBucket,
Expand All @@ -192,14 +211,16 @@ public static KvTablet create(
rowMerger,
arrowCompressionInfo,
schemaGetter,
changelogImage);
changelogImage,
rocksDBStatistics);
}

private static RocksDBKv buildRocksDBKv(
Configuration configuration, File kvDir, RateLimiter sharedRateLimiter)
throws IOException {
// Enable statistics to support RocksDB statistics collection
RocksDBResourceContainer rocksDBResourceContainer =
new RocksDBResourceContainer(configuration, kvDir, false, sharedRateLimiter);
new RocksDBResourceContainer(configuration, kvDir, true, sharedRateLimiter);
RocksDBKvBuilder rocksDBKvBuilder =
new RocksDBKvBuilder(
kvDir,
Expand All @@ -225,6 +246,16 @@ public File getKvTabletDir() {
return kvTabletDir;
}

/**
* Get RocksDB statistics accessor for this tablet.
*
* @return the RocksDB statistics accessor, or null if not available
*/
@Nullable
public RocksDBStatistics getRocksDBStatistics() {
return rocksDBStatistics;
}

void setFlushedLogOffset(long flushedLogOffset) {
this.flushedLogOffset = flushedLogOffset;
}
Expand Down Expand Up @@ -621,6 +652,8 @@ public void close() throws Exception {
if (isClosed) {
return;
}
// Note: RocksDB metrics lifecycle is managed by TableMetricGroup
// No need to close it here
if (rocksDBKv != null) {
rocksDBKv.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import org.apache.fluss.utils.BytesUtils;
import org.apache.fluss.utils.IOUtils;

import org.rocksdb.Cache;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Statistics;
import org.rocksdb.WriteOptions;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -63,19 +65,24 @@ public class RocksDBKv implements AutoCloseable {
/** Our RocksDB database. Currently, one kv tablet, one RocksDB instance. */
protected final RocksDB db;

/** RocksDB Statistics for metrics collection. */
private final @Nullable Statistics statistics;

// mark whether this kv is already closed and prevent duplicate closing
private volatile boolean closed = false;

public RocksDBKv(
RocksDBResourceContainer optionsContainer,
RocksDB db,
ResourceGuard rocksDBResourceGuard,
ColumnFamilyHandle defaultColumnFamilyHandle) {
ColumnFamilyHandle defaultColumnFamilyHandle,
@Nullable Statistics statistics) {
this.optionsContainer = optionsContainer;
this.db = db;
this.rocksDBResourceGuard = rocksDBResourceGuard;
this.writeOptions = optionsContainer.getWriteOptions();
this.defaultColumnFamilyHandle = defaultColumnFamilyHandle;
this.statistics = statistics;
}

public ResourceGuard getResourceGuard() {
Expand Down Expand Up @@ -206,4 +213,18 @@ public void close() throws Exception {
public RocksDB getDb() {
return db;
}

@Nullable
public Statistics getStatistics() {
return optionsContainer.getStatistics();
}

@Nullable
public Cache getBlockCache() {
return optionsContainer.getBlockCache();
}

public ColumnFamilyHandle getDefaultColumnFamilyHandle() {
return defaultColumnFamilyHandle;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,12 @@ public RocksDBKv build() throws KvBuildingException {
throw new KvBuildingException(errMsg, t);
}
LOG.info("Finished building RocksDB kv at {}.", instanceBasePath);
return new RocksDBKv(optionsContainer, db, rocksDBResourceGuard, defaultColumnFamilyHandle);
return new RocksDBKv(
optionsContainer,
db,
rocksDBResourceGuard,
defaultColumnFamilyHandle,
optionsContainer.getStatistics());
}

void prepareDirectories() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@

import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.Cache;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactionStyle;
import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.LRUCache;
import org.rocksdb.PlainTableConfig;
import org.rocksdb.RateLimiter;
import org.rocksdb.ReadOptions;
Expand Down Expand Up @@ -80,6 +82,12 @@ public class RocksDBResourceContainer implements AutoCloseable {
/** The shared rate limiter for all RocksDB instances. */
private final RateLimiter sharedRateLimiter;

/** The statistics object for RocksDB, null if statistics is disabled. */
@Nullable private Statistics statistics;

/** The block cache for RocksDB, shared across column families. */
@Nullable private Cache blockCache;

/** The handles to be closed when the container is closed. */
private final ArrayList<AutoCloseable> handlesToClose;

Expand Down Expand Up @@ -138,14 +146,26 @@ public DBOptions getDbOptions() throws IOException {
opt.setRateLimiter(sharedRateLimiter);

if (enableStatistics) {
Statistics statistics = new Statistics();
statistics = new Statistics();
opt.setStatistics(statistics);
handlesToClose.add(statistics);
}

return opt;
}

/** Gets the Statistics object if statistics is enabled, null otherwise. */
@Nullable
public Statistics getStatistics() {
return statistics;
}

/** Gets the block cache used by RocksDB, null if not yet initialized. */
@Nullable
public Cache getBlockCache() {
return blockCache;
}

/** Gets the RocksDB {@link ColumnFamilyOptions} to be used for all RocksDB instances. */
public ColumnFamilyOptions getColumnOptions() {
// initial options from common profile
Expand Down Expand Up @@ -282,8 +302,11 @@ private ColumnFamilyOptions setColumnFamilyOptionsFromConfigurableOptions(
blockBasedTableConfig.setMetadataBlockSize(
internalGetOption(ConfigOptions.KV_METADATA_BLOCK_SIZE).getBytes());

blockBasedTableConfig.setBlockCacheSize(
internalGetOption(ConfigOptions.KV_BLOCK_CACHE_SIZE).getBytes());
// Create explicit LRUCache for accurate memory tracking
long blockCacheSize = internalGetOption(ConfigOptions.KV_BLOCK_CACHE_SIZE).getBytes();
blockCache = new LRUCache(blockCacheSize);
handlesToClose.add(blockCache);
blockBasedTableConfig.setBlockCache(blockCache);

if (internalGetOption(ConfigOptions.KV_USE_BLOOM_FILTER)) {
final double bitsPerKey = internalGetOption(ConfigOptions.KV_BLOOM_FILTER_BITS_PER_KEY);
Expand Down
Loading