Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.cassandra.metrics;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import com.codahale.metrics.Counter;

Expand All @@ -28,6 +30,7 @@
import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.transport.messages.ResultMessage;

import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
Expand Down Expand Up @@ -66,18 +69,22 @@ public static void recordRowAndColumnCountMetrics(Collection<? extends IMutation

int rowCount = 0;
int columnCount = 0;
Map<TableId, Integer> rowsMutatedPerRequest = new HashMap<>();

for (IMutation mutation : mutations)
{
for (PartitionUpdate update : mutation.getPartitionUpdates())
{
columnCount += update.affectedColumnCount();
rowCount += update.affectedRowCount();
int affectedRows = update.affectedRowCount();
rowCount += affectedRows;
rowsMutatedPerRequest.merge(update.metadata().id, affectedRows, Integer::sum);
}
}

ClientRequestSizeMetrics.totalColumnsWritten.inc(columnCount);
ClientRequestSizeMetrics.totalRowsWritten.inc(rowCount);
incrementRowsMutatedPerWriteRequest(rowsMutatedPerRequest);
}

public static void recordRowAndColumnCountMetrics(PartitionUpdate update)
Expand All @@ -86,6 +93,20 @@ public static void recordRowAndColumnCountMetrics(PartitionUpdate update)
return;

ClientRequestSizeMetrics.totalColumnsWritten.inc(update.affectedColumnCount());
ClientRequestSizeMetrics.totalRowsWritten.inc(update.affectedRowCount());
int affectedRows = update.affectedRowCount();
ClientRequestSizeMetrics.totalRowsWritten.inc(affectedRows);
ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(update.metadata().id);
if (cfs != null)
cfs.metric.rowsMutatedPerWriteHistogram.update(affectedRows);
}

private static void incrementRowsMutatedPerWriteRequest(Map<TableId, Integer> rowsMutatedPerRequest)
{
for (Map.Entry<TableId, Integer> entry : rowsMutatedPerRequest.entrySet())
{
ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(entry.getKey());
if (cfs != null)
cfs.metric.rowsMutatedPerWriteHistogram.update(entry.getValue());
}
}
}
3 changes: 3 additions & 0 deletions src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ public class KeyspaceMetrics
public final Histogram sstablesPerReadHistogram;
/** Histogram of the number of sstable data files accessed per partition range read */
public final Histogram sstablesPerRangeReadHistogram;
/** Histogram of rows mutated per write request in this keyspace. */
public final Histogram rowsMutatedPerWriteHistogram;
/** Tombstones scanned in queries on this Keyspace */
public final Histogram tombstoneScannedHistogram;
/** Purgeable tombstones scanned in queries on this Keyspace */
Expand Down Expand Up @@ -255,6 +257,7 @@ public KeyspaceMetrics(final Keyspace ks)
// create histograms for TableMetrics to replicate updates to
sstablesPerReadHistogram = createKeyspaceHistogram("SSTablesPerReadHistogram", true);
sstablesPerRangeReadHistogram = createKeyspaceHistogram("SSTablesPerRangeReadHistogram", true);
rowsMutatedPerWriteHistogram = createKeyspaceHistogram("RowsMutatedPerWriteHistogram", false);
tombstoneScannedHistogram = createKeyspaceHistogram("TombstoneScannedHistogram", false);
purgeableTombstoneScannedHistogram = createKeyspaceHistogram("PurgeableTombstoneScannedHistogram", false);
liveScannedHistogram = createKeyspaceHistogram("LiveScannedHistogram", false);
Expand Down
3 changes: 3 additions & 0 deletions src/java/org/apache/cassandra/metrics/TableMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ public class TableMetrics
public final SnapshottingTimer coordinatorReadLatency;
public final Timer coordinatorScanLatency;
public final SnapshottingTimer coordinatorWriteLatency;
/** Histogram of rows mutated per write request on this table. */
public final TableHistogram rowsMutatedPerWriteHistogram;

private final TableMetricNameFactory factory;
private final TableMetricNameFactory aliasFactory;
Expand Down Expand Up @@ -816,6 +818,7 @@ public Long getValue()
coordinatorReadLatency = createTableTimer("CoordinatorReadLatency");
coordinatorScanLatency = createTableTimer("CoordinatorScanLatency");
coordinatorWriteLatency = createTableTimer("CoordinatorWriteLatency");
rowsMutatedPerWriteHistogram = createTableHistogram("RowsMutatedPerWriteHistogram", cfs.keyspace.metric.rowsMutatedPerWriteHistogram, false);

// We do not want to capture view mutation specific metrics for a view
// They only makes sense to capture on the base table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.statements.BatchStatement;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.paxos.Paxos;
Expand Down Expand Up @@ -219,11 +220,13 @@ public void shouldRecordReadMetricsForTokenAndClusteringSlice()
public void shouldRecordWriteMetricsForSingleValueRow()
{
createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
ColumnFamilyStore cfs = currentTableStore();

executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");

assertEquals(1, ClientRequestSizeMetrics.totalRowsWritten.getCount());
assertEquals(1, ClientRequestSizeMetrics.totalColumnsWritten.getCount());
assertEquals(1, cfs.metric.rowsMutatedPerWriteHistogram.cf.getCount());
}

@Test
Expand Down Expand Up @@ -254,6 +257,7 @@ public void shouldRecordWriteMetricsForMultiValueRow()
public void shouldRecordWriteMetricsForBatch() throws Exception
{
createTable("CREATE TABLE %s (pk int PRIMARY KEY, v1 int, v2 int)");
ColumnFamilyStore cfs = currentTableStore();

try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, CURRENT))
{
Expand All @@ -269,9 +273,15 @@ public void shouldRecordWriteMetricsForBatch() throws Exception
// The metrics should reflect the batch as a single write operation with multiple rows and columns.
assertEquals(2, ClientRequestSizeMetrics.totalRowsWritten.getCount());
assertEquals(4, ClientRequestSizeMetrics.totalColumnsWritten.getCount());
assertEquals(1, cfs.metric.rowsMutatedPerWriteHistogram.cf.getCount());
}
}

private ColumnFamilyStore currentTableStore()
{
return ColumnFamilyStore.getIfExists(KEYSPACE, currentTable());
}

@Test
public void shouldRecordWriteMetricsForCellDeletes()
{
Expand Down