Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/server/status/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ go_library(
"//pkg/util/log/logmetrics",
"//pkg/util/log/severity",
"//pkg/util/metric",
"//pkg/util/metric/aggmetric",
"//pkg/util/syncutil",
"//pkg/util/system",
"@com_github_cockroachdb_crlib//crtime",
Expand Down
157 changes: 137 additions & 20 deletions pkg/server/status/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"context"
"encoding/json"
"fmt"
"hash/fnv"
"io"
"math"
"os"
Expand Down Expand Up @@ -44,6 +45,7 @@ import (
// metrics functionality into pkg/util/log.
_ "github.com/cockroachdb/cockroach/pkg/util/log/logmetrics"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/system"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -189,6 +191,10 @@ type MetricsRecorder struct {
// round-trip) that requires a mutex to be safe for concurrent usage. We
// therefore give it its own mutex to avoid blocking other methods.
writeSummaryMu syncutil.Mutex

// childMetricNameCache caches the encoded names for child metrics to avoid
// rebuilding them on every recording. Uses syncutil.Map for lock-free reads.
childMetricNameCache syncutil.Map[uint64, string]
}

// NewMetricsRecorder initializes a new MetricsRecorder object that uses the
Expand Down Expand Up @@ -467,7 +473,7 @@ func (mr *MetricsRecorder) GetTimeSeriesData(childMetrics bool) []tspb.TimeSerie
source: mr.mu.desc.NodeID.String(),
timestampNanos: now.UnixNano(),
}
recorder.recordChangefeedChildMetrics(&data)
recorder.recordChangefeedChildMetrics(&data, &mr.childMetricNameCache)

// Record child metrics from app-level registries for secondary tenants
for tenantID, r := range mr.mu.tenantRegistries {
Expand All @@ -477,7 +483,7 @@ func (mr *MetricsRecorder) GetTimeSeriesData(childMetrics bool) []tspb.TimeSerie
source: tsutil.MakeTenantSource(mr.mu.desc.NodeID.String(), tenantID.String()),
timestampNanos: now.UnixNano(),
}
tenantRecorder.recordChangefeedChildMetrics(&data)
tenantRecorder.recordChangefeedChildMetrics(&data, &mr.childMetricNameCache)
}

atomic.CompareAndSwapInt64(&mr.lastDataCount, lastDataCount, int64(len(data)))
Expand Down Expand Up @@ -801,6 +807,25 @@ type registryRecorder struct {
timestampNanos int64
}

// allowedChangefeedMetrics is the list of changefeed metrics that should have
// child metrics collected and recorded to TSDB. This is a curated list to prevent
// unbounded cardinality while still capturing the most important per-changefeed metrics.
var allowedChangefeedMetrics = map[string]struct{}{
"changefeed.max_behind_nanos": {},
"changefeed.error_retries": {},
"changefeed.internal_retry_message_count": {},
"changefeed.stage.downstream_client_send.latency": {},
"changefeed.emitted_messages": {},
"changefeed.sink_backpressure_nanos": {},
"changefeed.backfill_pending_ranges": {},
"changefeed.sink_io_inflight": {},
"changefeed.lagging_ranges": {},
"changefeed.aggregator_progress": {},
"changefeed.checkpoint_progress": {},
"changefeed.emitted_batch_sizes": {},
"changefeed.total_ranges": {},
}

// extractValue extracts the metric value(s) for the given metric and passes it, along with the metric name, to the
// provided callback function.
func extractValue(name string, mtr interface{}, fn func(string, float64)) error {
Expand Down Expand Up @@ -929,30 +954,125 @@ func (rr registryRecorder) recordChild(
})
}

func hashLabels(labels []*prometheusgo.LabelPair) uint64 {
h := fnv.New64a()
for _, label := range labels {
h.Write([]byte(label.GetName()))
h.Write([]byte(label.GetValue()))
}
return h.Sum64()
}

// getOrComputeMetricName looks up the encoded metric name in the cache,
// or computes it using the provided computeFn if not found.
func getOrComputeMetricName(
cache *syncutil.Map[uint64, string], labels []*prometheusgo.LabelPair, computeFn func() string,
) string {
if cache == nil {
return computeFn()
}
labelHash := hashLabels(labels)
if cached, ok := cache.Load(labelHash); ok {
return *cached
}
name := computeFn()
cache.Store(labelHash, &name)
return name
}

// recordChangefeedChildMetrics iterates through changefeed metrics in the registry and processes child metrics
// for those that have TsdbRecordLabeled set to true in their metadata.
// Records up to 1024 child metrics to prevent unbounded memory usage and performance issues.
//
// NB: Only available for Counter and Gauge metrics.
func (rr registryRecorder) recordChangefeedChildMetrics(dest *[]tspb.TimeSeriesData) {
// Records up to 1024 child metrics per metric to prevent unbounded memory usage and performance issues.
func (rr registryRecorder) recordChangefeedChildMetrics(
dest *[]tspb.TimeSeriesData, cache *syncutil.Map[uint64, string],
) {
maxChildMetricsPerMetric := 1024

labels := rr.registry.GetLabels()
rr.registry.Each(func(name string, v interface{}) {
if _, allowed := allowedChangefeedMetrics[name]; !allowed {
return
}
// Check if the metric has child collection enabled in its metadata
iterable, ok := v.(metric.Iterable)
if !ok {
// If we can't get metadata, skip child collection for safety
iterable, isIterable := v.(metric.Iterable)
if !isIterable {
return
}
metadata := iterable.GetMetadata()
if !metadata.GetTsdbRecordLabeled() {
return // Skip this metric if child collection is not enabled
}
if metadata.Category != metric.Metadata_CHANGEFEEDS {

recordMetric := func(name string, value float64) {
*dest = append(*dest, tspb.TimeSeriesData{
Name: fmt.Sprintf(rr.format, name),
Source: rr.source,
Datapoints: []tspb.TimeSeriesDatapoint{
{
TimestampNanos: rr.timestampNanos,
Value: value,
},
},
})
}

// Handle AggHistogram - use direct child access for per-child snapshots
if aggHist, isAggHist := v.(*aggmetric.AggHistogram); isAggHist {
var childMetricsCount int
aggHist.EachChild(func(labelNames, labelVals []string, child *aggmetric.Histogram) {
if childMetricsCount >= maxChildMetricsPerMetric {
return
}

// Create label pairs for this child
childLabels := make([]*prometheusgo.LabelPair, len(labels), len(labels)+len(labelVals))
copy(childLabels, labels)

for i, val := range labelVals {
if i < len(labelNames) {
name := labelNames[i]
value := val
childLabels = append(childLabels, &prometheusgo.LabelPair{
Name: &name,
Value: &value,
})
}
}

// Get per-child snapshots
cumulativeSnapshot := child.CumulativeSnapshot()
windowedSnapshot := child.WindowedSnapshot()

// Check if we got valid snapshots by checking the sample count
// Empty snapshots will have count=0, which is safe to record but may not be meaningful
// This check ensures we handle cases where the histogram isn't properly initialized
cumulativeCount, _ := cumulativeSnapshot.Total()
windowedCount, _ := windowedSnapshot.Total()
if cumulativeCount < 0 || windowedCount < 0 {
// Skip malformed snapshots to avoid recording bad data
return
}

// Check cache for encoded name
baseName := getOrComputeMetricName(cache, childLabels, func() string {
return metadata.Name + metric.EncodeLabeledName(&prometheusgo.Metric{Label: childLabels})
})
// Record all histogram computed metrics using child-specific snapshots
for _, c := range metric.HistogramMetricComputers {
var value float64
if c.IsSummaryMetric {
value = c.ComputedMetric(windowedSnapshot)
} else {
value = c.ComputedMetric(cumulativeSnapshot)
}
recordMetric(baseName+c.Suffix, value)
}
childMetricsCount++
})
return
}

// Handle Counter and Gauge metrics via Prometheus export
prom, ok := v.(metric.PrometheusExportable)
if !ok {
return
Expand All @@ -967,7 +1087,7 @@ func (rr registryRecorder) recordChangefeedChildMetrics(dest *[]tspb.TimeSeriesD
var childMetricsCount int
processChildMetric := func(childMetric *prometheusgo.Metric) {
if childMetricsCount >= maxChildMetricsPerMetric {
return // Stop processing once we hit the limit
return
}

var value float64
Expand All @@ -978,16 +1098,13 @@ func (rr registryRecorder) recordChangefeedChildMetrics(dest *[]tspb.TimeSeriesD
} else {
return
}
*dest = append(*dest, tspb.TimeSeriesData{
Name: fmt.Sprintf(rr.format, prom.GetName(false /* useStaticLabels */)+metric.EncodeLabeledName(childMetric)),
Source: rr.source,
Datapoints: []tspb.TimeSeriesDatapoint{
{
TimestampNanos: rr.timestampNanos,
Value: value,
},
},

// Check cache for encoded name
metricName := getOrComputeMetricName(cache, childMetric.Label, func() string {
return prom.GetName(false /* useStaticLabels */) + metric.EncodeLabeledName(childMetric)
})

recordMetric(metricName, value)
childMetricsCount++
}
promIter.Each(m.Label, processChildMetric)
Expand Down
Loading