Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/java/org/apache/cassandra/cache/AutoSavingCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ else if (cacheType == CacheService.CacheType.COUNTER_CACHE)
type,
0,
keysEstimate,
keysEstimate,
Unit.KEYS,
nextTimeUUID(),
getCacheDataPath(CURRENT_VERSION).toPath().toString());
Expand All @@ -341,7 +342,8 @@ public CompactionInfo getCompactionInfo()
{
// keyset can change in size, thus total can too
// TODO need to check for this one... was: info.forProgress(keysWritten, Math.max(keysWritten, keys.size()));
return info.forProgress(keysWritten, Math.max(keysWritten, keysEstimate));
long totalKeys = Math.max(keysWritten, keysEstimate);
return info.forProgress(keysWritten, totalKeys, totalKeys);
}

public void saveCache()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,14 @@ public void finishCompaction(CompactionInfo.Holder ci)
{
compactions.remove(ci);
CompactionManager.instance.getMetrics().bytesCompacted.inc(ci.getCompactionInfo().getTotal());
CompactionManager.instance.getMetrics().compressedBytesCompacted.inc(ci.getCompactionInfo().getTotalCompressed());
CompactionManager.instance.getMetrics().totalCompactionsCompleted.mark();
}

/**
* Get the estimated number of bytes remaining to write per sstable directory
*/
public Map<File, Long> estimatedRemainingWriteBytes()
public Map<File, Long> estimatedRemainingWriteToDiskBytes()
{
synchronized (compactions)
{
Expand All @@ -66,7 +67,7 @@ public Map<File, Long> estimatedRemainingWriteBytes()
List<File> directories = compactionInfo.getTargetDirectories();
if (directories == null || directories.isEmpty())
continue;
long remainingWriteBytesPerDataDir = compactionInfo.estimatedRemainingWriteBytes() / directories.size();
long remainingWriteBytesPerDataDir = compactionInfo.estimatedRemainingWriteToDiskBytes() / directories.size();
for (File directory : directories)
writeBytesPerSSTableDir.merge(directory, remainingWriteBytesPerDataDir, Long::sum);
}
Expand Down
41 changes: 27 additions & 14 deletions src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public final class CompactionInfo
public static final String COLUMNFAMILY = "columnfamily";
public static final String COMPLETED = "completed";
public static final String TOTAL = "total";
public static final String TOTAL_COMPRESSED = "totalCompressed";
public static final String TASK_TYPE = "taskType";
public static final String UNIT = "unit";
public static final String COMPACTION_ID = "compactionId";
Expand All @@ -52,55 +53,57 @@ public final class CompactionInfo
private final OperationType tasktype;
private final long completed;
private final long total;
private final long totalCompressed;
private final Unit unit;
private final TimeUUID compactionId;
private final ImmutableSet<SSTableReader> sstables;
private final String targetDirectory;

public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, Unit unit, TimeUUID compactionId, Collection<? extends SSTableReader> sstables, String targetDirectory)
public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, long totalCompressed, Unit unit, TimeUUID compactionId, Collection<? extends SSTableReader> sstables, String targetDirectory)
{
this.tasktype = tasktype;
this.completed = completed;
this.total = total;
this.totalCompressed = totalCompressed;
this.metadata = metadata;
this.unit = unit;
this.compactionId = compactionId;
this.sstables = ImmutableSet.copyOf(sstables);
this.targetDirectory = targetDirectory;
}

public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, TimeUUID compactionId, Collection<SSTableReader> sstables, String targetDirectory)
public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, long totalCompressed, TimeUUID compactionId, Collection<SSTableReader> sstables, String targetDirectory)
{
this(metadata, tasktype, completed, total, Unit.BYTES, compactionId, sstables, targetDirectory);
this(metadata, tasktype, completed, total, totalCompressed, Unit.BYTES, compactionId, sstables, targetDirectory);
}

public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, TimeUUID compactionId, Collection<? extends SSTableReader> sstables)
public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, long totalCompressed, TimeUUID compactionId, Collection<? extends SSTableReader> sstables)
{
this(metadata, tasktype, completed, total, Unit.BYTES, compactionId, sstables, null);
this(metadata, tasktype, completed, total, totalCompressed, Unit.BYTES, compactionId, sstables, null);
}

/**
* Special compaction info where we always need to cancel the compaction - for example ViewBuilderTask where we don't know
* the sstables at construction
*/
public static CompactionInfo withoutSSTables(TableMetadata metadata, OperationType tasktype, long completed, long total, Unit unit, TimeUUID compactionId)
public static CompactionInfo withoutSSTables(TableMetadata metadata, OperationType tasktype, long completed, long total, long totalCompressed, Unit unit, TimeUUID compactionId)
{
return withoutSSTables(metadata, tasktype, completed, total, unit, compactionId, null);
return withoutSSTables(metadata, tasktype, completed, total, totalCompressed, unit, compactionId, null);
}

/**
* Special compaction info where we always need to cancel the compaction - for example AutoSavingCache where we don't know
* the sstables at construction
*/
public static CompactionInfo withoutSSTables(TableMetadata metadata, OperationType tasktype, long completed, long total, Unit unit, TimeUUID compactionId, String targetDirectory)
public static CompactionInfo withoutSSTables(TableMetadata metadata, OperationType tasktype, long completed, long total, long totalCompressed, Unit unit, TimeUUID compactionId, String targetDirectory)
{
return new CompactionInfo(metadata, tasktype, completed, total, unit, compactionId, ImmutableSet.of(), targetDirectory);
return new CompactionInfo(metadata, tasktype, completed, total, totalCompressed, unit, compactionId, ImmutableSet.of(), targetDirectory);
}

/** @return A copy of this CompactionInfo with updated progress. */
public CompactionInfo forProgress(long complete, long total)
public CompactionInfo forProgress(long complete, long total, long totalCompressed)
{
return new CompactionInfo(metadata, tasktype, complete, total, unit, compactionId, sstables, targetDirectory);
return new CompactionInfo(metadata, tasktype, complete, total, totalCompressed, unit, compactionId, sstables, targetDirectory);
}

public Optional<String> getKeyspace()
Expand Down Expand Up @@ -128,6 +131,11 @@ public long getTotal()
return total;
}

public long getTotalCompressed()
{
return totalCompressed;
}

public OperationType getTaskType()
{
return tasktype;
Expand Down Expand Up @@ -183,12 +191,16 @@ public String targetDirectory()
/**
* Note that this estimate is based on the amount of data we have left to read - it assumes input
* size == output size for a compaction, which is not really true, but should most often provide a worst case
* remaining write size.
* remaining write size. We also scale by the effective compression ratio since total/completed are for the uncompressed size.
*/
public long estimatedRemainingWriteBytes()
public long estimatedRemainingWriteToDiskBytes()
{
if (unit == Unit.BYTES && tasktype.writesData)
return getTotal() - getCompleted();
{
final long total = getTotal();
double compressionRatio = total == 0 ? 1 : ((double) totalCompressed / (double)total);
return (long)(compressionRatio * (total - getCompleted()));
}
return 0;
}

Expand Down Expand Up @@ -216,6 +228,7 @@ public Map<String, String> asMap()
ret.put(COLUMNFAMILY, getTable().orElse(null));
ret.put(COMPLETED, Long.toString(completed));
ret.put(TOTAL, Long.toString(total));
ret.put(TOTAL_COMPRESSED, Long.toString(totalCompressed));
ret.put(TASK_TYPE, tasktype.toString());
ret.put(UNIT, unit.toString());
ret.put(COMPACTION_ID, compactionId == null ? "" : compactionId.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
private final long nowInSec;
private final TimeUUID compactionId;
private final long totalBytes;
private final long totalCompressedBytes;
private long bytesRead;
private long totalSourceCQLRows;

Expand Down Expand Up @@ -135,9 +136,14 @@ public CompactionIterator(OperationType type,
this.bytesRead = 0;

long bytes = 0;
long compressedBytes = 0;
for (ISSTableScanner scanner : scanners)
{
bytes += scanner.getLengthInBytes();
compressedBytes += scanner.getCompressedLengthInBytes();
}
this.totalBytes = bytes;
this.totalCompressedBytes = compressedBytes;
this.mergeCounters = new long[scanners.size()];
// note that we leak `this` from the constructor when calling beginCompaction below, this means we have to get the sstables before
// calling that to avoid a NPE.
Expand Down Expand Up @@ -170,6 +176,7 @@ public CompactionInfo getCompactionInfo()
type,
bytesRead,
totalBytes,
totalCompressedBytes,
compactionId,
sstables,
targetDirectory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ protected boolean buildCompactionCandidatesForAvailableDiskSpace(final Set<SSTab
for (File directory : newCompactionDatadirs)
expectedNewWriteSize.put(directory, writeSizePerOutputDatadir);

Map<File, Long> expectedWriteSize = CompactionManager.instance.active.estimatedRemainingWriteBytes();
Map<File, Long> expectedWriteSize = CompactionManager.instance.active.estimatedRemainingWriteToDiskBytes();

// todo: abort streams if they block compactions
if (cfs.getDirectories().hasDiskSpaceForCompactionsAndStreams(expectedNewWriteSize, expectedWriteSize))
Expand Down
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/db/view/ViewBuilderTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,13 @@ public CompactionInfo getCompactionInfo()
if (range.left.getPartitioner().splitter().isPresent())
{
long progress = prevToken == null ? 0 : Math.round(prevToken.getPartitioner().splitter().get().positionInRange(prevToken, range) * 1000);
return CompactionInfo.withoutSSTables(baseCfs.metadata(), OperationType.VIEW_BUILD, progress, 1000, Unit.RANGES, compactionId);
return CompactionInfo.withoutSSTables(baseCfs.metadata(), OperationType.VIEW_BUILD, progress, 1000, 1000, Unit.RANGES, compactionId);
}

// When there is no splitter, estimate based on number of total keys but
// take the max with keysBuilt + 1 to avoid having more completed than total
long keysTotal = Math.max(keysBuilt + 1, baseCfs.estimatedKeysForRange(range));
return CompactionInfo.withoutSSTables(baseCfs.metadata(), OperationType.VIEW_BUILD, keysBuilt, keysTotal, Unit.KEYS, compactionId);
return CompactionInfo.withoutSSTables(baseCfs.metadata(), OperationType.VIEW_BUILD, keysBuilt, keysTotal, keysTotal, Unit.KEYS, compactionId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ final class SSTableTasksTable extends AbstractVirtualTable
private final static String PROGRESS = "progress";
private final static String SSTABLES = "sstables";
private final static String TOTAL = "total";
private final static String TOTAL_COMPRESSED = "total_compressed";
private final static String UNIT = "unit";
private final static String TARGET_DIRECTORY = "target_directory";

Expand All @@ -54,6 +55,7 @@ final class SSTableTasksTable extends AbstractVirtualTable
.addRegularColumn(PROGRESS, LongType.instance)
.addRegularColumn(SSTABLES, Int32Type.instance)
.addRegularColumn(TOTAL, LongType.instance)
.addRegularColumn(TOTAL_COMPRESSED, LongType.instance)
.addRegularColumn(UNIT, UTF8Type.instance)
.addRegularColumn(TARGET_DIRECTORY, UTF8Type.instance)
.build());
Expand All @@ -67,6 +69,7 @@ public DataSet data()
{
long completed = task.getCompleted();
long total = task.getTotal();
long totalCompressed = task.getTotalCompressed();

double completionRatio = total == 0L ? 1.0 : (((double) completed) / total);

Expand All @@ -79,6 +82,7 @@ public DataSet data()
.column(SSTABLES, task.getSSTables().size())
.column(TOTAL, total)
.column(UNIT, task.getUnit().toString().toLowerCase())
.column(TOTAL_COMPRESSED, totalCompressed)
.column(TARGET_DIRECTORY, task.targetDirectory());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ public CompactionInfo getCompactionInfo()
OperationType.INDEX_BUILD,
iter.getBytesRead(),
iter.getTotalBytes(),
iter.getTotalBytes(),
compactionId,
sstables);
sstables
);
}

public void build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ public CompactionInfo getCompactionInfo()
OperationType.INDEX_BUILD,
bytesProcessed,
totalSizeInBytes,
totalSizeInBytes,
compactionId,
sstables.keySet());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public CompactionInfo getCompactionInfo()
OperationType.INDEX_BUILD,
bytesProcessed,
totalBytesToProcess,
totalBytesToProcess,
compactionId,
sstables.keySet(),
targetDirectory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ public CompactionInfo getCompactionInfo()
OperationType.SCRUB,
dataFile.getFilePointer(),
dataFile.length(),
sstable.onDiskLength(),
scrubCompactionId,
ImmutableSet.of(sstable),
File.getPath(sstable.getFilename()).getParent().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ public CompactionInfo getCompactionInfo()
OperationType.VERIFY,
dataFile.getFilePointer(),
dataFile.length(),
sstable.onDiskLength(),
verificationCompactionId,
ImmutableSet.of(sstable));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ static <T extends SSTableReader & IndexSummarySupport<T>> Pair<List<T>, List<Res

public CompactionInfo getCompactionInfo()
{
return CompactionInfo.withoutSSTables(null, OperationType.INDEX_SUMMARY, (memoryPoolBytes - remainingSpace), memoryPoolBytes, Unit.BYTES, compactionId);
return CompactionInfo.withoutSSTables(null, OperationType.INDEX_SUMMARY, (memoryPoolBytes - remainingSpace), memoryPoolBytes, memoryPoolBytes, Unit.BYTES, compactionId);
}

public boolean isGlobal()
Expand Down
3 changes: 3 additions & 0 deletions src/java/org/apache/cassandra/metrics/CompactionMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class CompactionMetrics
public final Meter totalCompactionsCompleted;
/** Total number of bytes compacted since server [re]start */
public final Counter bytesCompacted;
/** Estimated compressed bytes compacted since server [re]start, computed by scaling uncompressed bytes by the compression ratio */
public final Counter compressedBytesCompacted;
/** Time spent redistributing index summaries */
public final Timer indexSummaryRedistributionTime;

Expand Down Expand Up @@ -146,6 +148,7 @@ public Long getValue()
});
totalCompactionsCompleted = Metrics.meter(factory.createMetricName("TotalCompactionsCompleted"));
bytesCompacted = Metrics.counter(factory.createMetricName("BytesCompacted"));
compressedBytesCompacted = Metrics.counter(factory.createMetricName("CompressedBytesCompacted"));

// compaction failure metrics
compactionsReduced = Metrics.counter(factory.createMetricName("CompactionsReduced"));
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/streaming/StreamSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ static boolean checkDiskSpace(Map<TableId, Long> perTableIdIncomingBytes,
for (FileStore fs : allWriteableFileStores)
newStreamBytesToWritePerFileStore.merge(fs, totalBytesInPerFileStore, Long::sum);
}
Map<FileStore, Long> totalCompactionWriteRemaining = Directories.perFileStore(CompactionManager.instance.active.estimatedRemainingWriteBytes(),
Map<FileStore, Long> totalCompactionWriteRemaining = Directories.perFileStore(CompactionManager.instance.active.estimatedRemainingWriteToDiskBytes(),
fileStoreMapper);
long totalStreamRemaining = StreamManager.instance.getTotalRemainingOngoingBytes();
long totalBytesStreamRemainingPerFileStore = totalStreamRemaining / Math.max(1, allFileStores.size());
Expand Down
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/tools/NodeProbe.java
Original file line number Diff line number Diff line change
Expand Up @@ -2090,6 +2090,7 @@ public Object getCompactionMetric(String metricName)
switch(metricName)
{
case "BytesCompacted":
case "CompressedBytesCompacted":
case "CompactionsAborted":
case "CompactionsReduced":
case "SSTablesDroppedFromCompaction":
Expand Down
Loading