Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ public ResultMessage.Rows execute(QueryState state, QueryOptions options, Dispat
unmask);
}
if (!SchemaConstants.isSystemKeyspace(table.keyspace))
ClientRequestSizeMetrics.recordReadResponseMetrics(rows, restrictions, selection);
ClientRequestSizeMetrics.recordReadResponseMetrics(rows, restrictions, selection, table);

return rows;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
import org.apache.cassandra.cql3.selection.Selection;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.transport.messages.ResultMessage;

import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
Expand All @@ -41,13 +43,17 @@ public class ClientRequestSizeMetrics
public static final Counter totalColumnsWritten = Metrics.counter(DefaultNameFactory.createMetricName(TYPE, "ColumnsWritten", null));
public static final Counter totalRowsWritten = Metrics.counter(DefaultNameFactory.createMetricName(TYPE, "RowsWritten", null));

public static void recordReadResponseMetrics(ResultMessage.Rows rows, StatementRestrictions restrictions, Selection selection)
public static void recordReadResponseMetrics(ResultMessage.Rows rows,
StatementRestrictions restrictions,
Selection selection,
TableMetadata table)
{
if (!DatabaseDescriptor.getClientRequestSizeMetricsEnabled())
return;

int rowCount = rows.result.size();
ClientRequestSizeMetrics.totalRowsRead.inc(rowCount);
incrementTableRowsRead(table, rowCount);

int nonRestrictedColumns = selection.getColumns().size();

Expand All @@ -72,7 +78,9 @@ public static void recordRowAndColumnCountMetrics(Collection<? extends IMutation
for (PartitionUpdate update : mutation.getPartitionUpdates())
{
columnCount += update.affectedColumnCount();
rowCount += update.affectedRowCount();
int affectedRows = update.affectedRowCount();
rowCount += affectedRows;
incrementTableRowsMutated(update.metadata(), affectedRows);
}
}

Expand All @@ -86,6 +94,22 @@ public static void recordRowAndColumnCountMetrics(PartitionUpdate update)
return;

ClientRequestSizeMetrics.totalColumnsWritten.inc(update.affectedColumnCount());
ClientRequestSizeMetrics.totalRowsWritten.inc(update.affectedRowCount());
int affectedRows = update.affectedRowCount();
ClientRequestSizeMetrics.totalRowsWritten.inc(affectedRows);
incrementTableRowsMutated(update.metadata(), affectedRows);
}

private static void incrementTableRowsRead(TableMetadata table, int rowCount)
{
ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(table.id);
if (cfs != null)
cfs.metric.rowsRead.inc(rowCount);
}

private static void incrementTableRowsMutated(TableMetadata table, int rowCount)
{
ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(table.id);
if (cfs != null)
cfs.metric.rowsMutated.inc(rowCount);
}
}
6 changes: 6 additions & 0 deletions src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ public class KeyspaceMetrics
public final Counter speculativeInsufficientReplicas;
/** Needed to write to a transient replica to satisfy quorum **/
public final Counter additionalWrites;
/** Total number of rows returned to clients for this keyspace. */
public final Counter rowsRead;
/** Total number of rows mutated for this keyspace. */
public final Counter rowsMutated;
/** Number of started repairs as coordinator on this keyspace */
public final Counter repairsStarted;
/** Number of completed repairs as coordinator on this keyspace */
Expand Down Expand Up @@ -279,6 +283,8 @@ public KeyspaceMetrics(final Keyspace ks)
speculativeFailedRetries = createKeyspaceCounter("SpeculativeFailedRetries", metric -> metric.speculativeFailedRetries.getCount());
speculativeInsufficientReplicas = createKeyspaceCounter("SpeculativeInsufficientReplicas", metric -> metric.speculativeInsufficientReplicas.getCount());
additionalWrites = createKeyspaceCounter("AdditionalWrites", metric -> metric.additionalWrites.getCount());
rowsRead = createKeyspaceCounter("RowsRead", metric -> metric.rowsRead.getCount());
rowsMutated = createKeyspaceCounter("RowsMutated", metric -> metric.rowsMutated.getCount());
repairsStarted = createKeyspaceCounter("RepairJobsStarted", metric -> metric.repairsStarted.getCount());
repairsCompleted = createKeyspaceCounter("RepairJobsCompleted", metric -> metric.repairsCompleted.getCount());
repairTime =createKeyspaceTimer("RepairTime");
Expand Down
6 changes: 6 additions & 0 deletions src/java/org/apache/cassandra/metrics/TableMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ public class TableMetrics
public final SnapshottingTimer coordinatorReadLatency;
public final Timer coordinatorScanLatency;
public final SnapshottingTimer coordinatorWriteLatency;
/** Total number of rows returned to clients for this table. */
public final Counter rowsRead;
/** Total number of rows mutated for this table. */
public final Counter rowsMutated;

private final TableMetricNameFactory factory;
private final TableMetricNameFactory aliasFactory;
Expand Down Expand Up @@ -816,6 +820,8 @@ public Long getValue()
coordinatorReadLatency = createTableTimer("CoordinatorReadLatency");
coordinatorScanLatency = createTableTimer("CoordinatorScanLatency");
coordinatorWriteLatency = createTableTimer("CoordinatorWriteLatency");
rowsRead = createTableCounter("RowsRead");
rowsMutated = createTableCounter("RowsMutated");

// We do not want to capture view mutation specific metrics for a view
// They only makes sense to capture on the base table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.statements.BatchStatement;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.paxos.Paxos;
Expand Down Expand Up @@ -67,12 +68,14 @@ public void clearMetrics()
public void shouldRecordReadMetricsForMultiRowPartitionSelection()
{
createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
ColumnFamilyStore cfs = currentTableStore();

executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 2, 2)");
executeNet(CURRENT, "SELECT * FROM %s WHERE pk = 1");

assertEquals(2, ClientRequestSizeMetrics.totalRowsRead.getCount());
assertEquals(2, cfs.metric.rowsRead.getCount());
// The partition key is provided by the client in the request, so we don't consider those columns as read.
assertEquals(4, ClientRequestSizeMetrics.totalColumnsRead.getCount());
assertRowsContains(executeNet("SELECT * FROM system_metrics.client_request_size_group"),
Expand Down Expand Up @@ -254,6 +257,7 @@ public void shouldRecordWriteMetricsForMultiValueRow()
public void shouldRecordWriteMetricsForBatch() throws Exception
{
createTable("CREATE TABLE %s (pk int PRIMARY KEY, v1 int, v2 int)");
ColumnFamilyStore cfs = currentTableStore();

try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, CURRENT))
{
Expand All @@ -269,9 +273,15 @@ public void shouldRecordWriteMetricsForBatch() throws Exception
// The metrics should reflect the batch as a single write operation with multiple rows and columns.
assertEquals(2, ClientRequestSizeMetrics.totalRowsWritten.getCount());
assertEquals(4, ClientRequestSizeMetrics.totalColumnsWritten.getCount());
assertEquals(2, cfs.metric.rowsMutated.getCount());
}
}

private ColumnFamilyStore currentTableStore()
{
return ColumnFamilyStore.getIfExists(KEYSPACE, currentTable());
}

@Test
public void shouldRecordWriteMetricsForCellDeletes()
{
Expand Down