diff --git a/pkg/server/status/BUILD.bazel b/pkg/server/status/BUILD.bazel index a48432ec2931..74ddd7410b98 100644 --- a/pkg/server/status/BUILD.bazel +++ b/pkg/server/status/BUILD.bazel @@ -68,6 +68,7 @@ go_library( "//pkg/util/log/logmetrics", "//pkg/util/log/severity", "//pkg/util/metric", + "//pkg/util/metric/aggmetric", "//pkg/util/syncutil", "//pkg/util/system", "@com_github_cockroachdb_crlib//crtime", diff --git a/pkg/server/status/recorder.go b/pkg/server/status/recorder.go index acb03ce9627a..8d4d5fbdea51 100644 --- a/pkg/server/status/recorder.go +++ b/pkg/server/status/recorder.go @@ -10,6 +10,7 @@ import ( "context" "encoding/json" "fmt" + "hash/fnv" "io" "math" "os" @@ -44,6 +45,7 @@ import ( // metrics functionality into pkg/util/log. _ "github.com/cockroachdb/cockroach/pkg/util/log/logmetrics" "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/system" "github.com/cockroachdb/errors" @@ -189,6 +191,10 @@ type MetricsRecorder struct { // round-trip) that requires a mutex to be safe for concurrent usage. We // therefore give it its own mutex to avoid blocking other methods. writeSummaryMu syncutil.Mutex + + // childMetricNameCache caches the encoded names for child metrics to avoid + // rebuilding them on every recording. Uses syncutil.Map for lock-free reads. + childMetricNameCache syncutil.Map[uint64, string] } // NewMetricsRecorder initializes a new MetricsRecorder object that uses the @@ -467,7 +473,7 @@ func (mr *MetricsRecorder) GetTimeSeriesData(childMetrics bool) []tspb.TimeSerie source: mr.mu.desc.NodeID.String(), timestampNanos: now.UnixNano(), } - recorder.recordChangefeedChildMetrics(&data) + recorder.recordChangefeedChildMetrics(&data, &mr.childMetricNameCache) // Record child metrics from app-level registries for secondary tenants for tenantID, r := range mr.mu.tenantRegistries { @@ -477,7 +483,7 @@ func (mr *MetricsRecorder) GetTimeSeriesData(childMetrics bool) []tspb.TimeSerie source: tsutil.MakeTenantSource(mr.mu.desc.NodeID.String(), tenantID.String()), timestampNanos: now.UnixNano(), } - tenantRecorder.recordChangefeedChildMetrics(&data) + tenantRecorder.recordChangefeedChildMetrics(&data, &mr.childMetricNameCache) } atomic.CompareAndSwapInt64(&mr.lastDataCount, lastDataCount, int64(len(data))) @@ -801,6 +807,25 @@ type registryRecorder struct { timestampNanos int64 } +// allowedChangefeedMetrics is the list of changefeed metrics that should have +// child metrics collected and recorded to TSDB. This is a curated list to prevent +// unbounded cardinality while still capturing the most important per-changefeed metrics. +var allowedChangefeedMetrics = map[string]struct{}{ + "changefeed.max_behind_nanos": {}, + "changefeed.error_retries": {}, + "changefeed.internal_retry_message_count": {}, + "changefeed.stage.downstream_client_send.latency": {}, + "changefeed.emitted_messages": {}, + "changefeed.sink_backpressure_nanos": {}, + "changefeed.backfill_pending_ranges": {}, + "changefeed.sink_io_inflight": {}, + "changefeed.lagging_ranges": {}, + "changefeed.aggregator_progress": {}, + "changefeed.checkpoint_progress": {}, + "changefeed.emitted_batch_sizes": {}, + "changefeed.total_ranges": {}, +} + // extractValue extracts the metric value(s) for the given metric and passes it, along with the metric name, to the // provided callback function. func extractValue(name string, mtr interface{}, fn func(string, float64)) error { @@ -929,30 +954,125 @@ func (rr registryRecorder) recordChild( }) } +func hashLabels(labels []*prometheusgo.LabelPair) uint64 { + h := fnv.New64a() + for _, label := range labels { + h.Write([]byte(label.GetName())) + h.Write([]byte(label.GetValue())) + } + return h.Sum64() +} + +// getOrComputeMetricName looks up the encoded metric name in the cache, +// or computes it using the provided computeFn if not found. +func getOrComputeMetricName( + cache *syncutil.Map[uint64, string], labels []*prometheusgo.LabelPair, computeFn func() string, +) string { + if cache == nil { + return computeFn() + } + labelHash := hashLabels(labels) + if cached, ok := cache.Load(labelHash); ok { + return *cached + } + name := computeFn() + cache.Store(labelHash, &name) + return name +} + // recordChangefeedChildMetrics iterates through changefeed metrics in the registry and processes child metrics // for those that have TsdbRecordLabeled set to true in their metadata. -// Records up to 1024 child metrics to prevent unbounded memory usage and performance issues. -// -// NB: Only available for Counter and Gauge metrics. -func (rr registryRecorder) recordChangefeedChildMetrics(dest *[]tspb.TimeSeriesData) { +// Records up to 1024 child metrics per metric to prevent unbounded memory usage and performance issues. +func (rr registryRecorder) recordChangefeedChildMetrics( + dest *[]tspb.TimeSeriesData, cache *syncutil.Map[uint64, string], +) { maxChildMetricsPerMetric := 1024 labels := rr.registry.GetLabels() rr.registry.Each(func(name string, v interface{}) { + if _, allowed := allowedChangefeedMetrics[name]; !allowed { + return + } // Check if the metric has child collection enabled in its metadata - iterable, ok := v.(metric.Iterable) - if !ok { - // If we can't get metadata, skip child collection for safety + iterable, isIterable := v.(metric.Iterable) + if !isIterable { return } metadata := iterable.GetMetadata() if !metadata.GetTsdbRecordLabeled() { return // Skip this metric if child collection is not enabled } - if metadata.Category != metric.Metadata_CHANGEFEEDS { + + recordMetric := func(name string, value float64) { + *dest = append(*dest, tspb.TimeSeriesData{ + Name: fmt.Sprintf(rr.format, name), + Source: rr.source, + Datapoints: []tspb.TimeSeriesDatapoint{ + { + TimestampNanos: rr.timestampNanos, + Value: value, + }, + }, + }) + } + + // Handle AggHistogram - use direct child access for per-child snapshots + if aggHist, isAggHist := v.(*aggmetric.AggHistogram); isAggHist { + var childMetricsCount int + aggHist.EachChild(func(labelNames, labelVals []string, child *aggmetric.Histogram) { + if childMetricsCount >= maxChildMetricsPerMetric { + return + } + + // Create label pairs for this child + childLabels := make([]*prometheusgo.LabelPair, len(labels), len(labels)+len(labelVals)) + copy(childLabels, labels) + + for i, val := range labelVals { + if i < len(labelNames) { + name := labelNames[i] + value := val + childLabels = append(childLabels, &prometheusgo.LabelPair{ + Name: &name, + Value: &value, + }) + } + } + + // Get per-child snapshots + cumulativeSnapshot := child.CumulativeSnapshot() + windowedSnapshot := child.WindowedSnapshot() + + // Check if we got valid snapshots by checking the sample count + // Empty snapshots will have count=0, which is safe to record but may not be meaningful + // This check ensures we handle cases where the histogram isn't properly initialized + cumulativeCount, _ := cumulativeSnapshot.Total() + windowedCount, _ := windowedSnapshot.Total() + if cumulativeCount < 0 || windowedCount < 0 { + // Skip malformed snapshots to avoid recording bad data + return + } + + // Check cache for encoded name + baseName := getOrComputeMetricName(cache, childLabels, func() string { + return metadata.Name + metric.EncodeLabeledName(&prometheusgo.Metric{Label: childLabels}) + }) + // Record all histogram computed metrics using child-specific snapshots + for _, c := range metric.HistogramMetricComputers { + var value float64 + if c.IsSummaryMetric { + value = c.ComputedMetric(windowedSnapshot) + } else { + value = c.ComputedMetric(cumulativeSnapshot) + } + recordMetric(baseName+c.Suffix, value) + } + childMetricsCount++ + }) return } + // Handle Counter and Gauge metrics via Prometheus export prom, ok := v.(metric.PrometheusExportable) if !ok { return @@ -967,7 +1087,7 @@ func (rr registryRecorder) recordChangefeedChildMetrics(dest *[]tspb.TimeSeriesD var childMetricsCount int processChildMetric := func(childMetric *prometheusgo.Metric) { if childMetricsCount >= maxChildMetricsPerMetric { - return // Stop processing once we hit the limit + return } var value float64 @@ -978,16 +1098,13 @@ func (rr registryRecorder) recordChangefeedChildMetrics(dest *[]tspb.TimeSeriesD } else { return } - *dest = append(*dest, tspb.TimeSeriesData{ - Name: fmt.Sprintf(rr.format, prom.GetName(false /* useStaticLabels */)+metric.EncodeLabeledName(childMetric)), - Source: rr.source, - Datapoints: []tspb.TimeSeriesDatapoint{ - { - TimestampNanos: rr.timestampNanos, - Value: value, - }, - }, + + // Check cache for encoded name + metricName := getOrComputeMetricName(cache, childMetric.Label, func() string { + return prom.GetName(false /* useStaticLabels */) + metric.EncodeLabeledName(childMetric) }) + + recordMetric(metricName, value) childMetricsCount++ } promIter.Each(m.Label, processChildMetric) diff --git a/pkg/server/status/recorder_test.go b/pkg/server/status/recorder_test.go index 0ba74d3a4eca..046df9175c8b 100644 --- a/pkg/server/status/recorder_test.go +++ b/pkg/server/status/recorder_test.go @@ -8,6 +8,7 @@ package status import ( "bytes" "context" + "fmt" "io" "math/rand" "os" @@ -263,11 +264,10 @@ func TestMetricsRecorderLabels(t *testing.T) { // ======================================== // Verify that GetTimeSeriesData with includeChildMetrics=true returns child labels // ======================================== - // Add aggmetrics with child labels to the app registry aggCounter := aggmetric.NewCounter( metric.Metadata{ - Name: "changefeed.agg_counter", + Name: "changefeed.emitted_messages", Category: metric.Metadata_CHANGEFEEDS, }, "some-id", "feed_id", @@ -281,7 +281,7 @@ func TestMetricsRecorderLabels(t *testing.T) { aggGauge := aggmetric.NewGauge( metric.Metadata{ - Name: "changefeed.agg_gauge", + Name: "changefeed.lagging_ranges", Category: metric.Metadata_CHANGEFEEDS, }, "db", "status!!", @@ -293,56 +293,124 @@ func TestMetricsRecorderLabels(t *testing.T) { child4 := aggGauge.AddChild("yours", "paused") child4.Update(0) + aggHistogram := aggmetric.NewHistogram( + metric.HistogramOptions{ + Metadata: metric.Metadata{ + Name: "changefeed.stage.downstream_client_send.latency", + Category: metric.Metadata_CHANGEFEEDS, + }, + Duration: 10 * time.Second, + BucketConfig: metric.IOLatencyBuckets, + Mode: metric.HistogramModePrometheus, + }, + "scope", + ) + appReg.AddMetric(aggHistogram) + // Add child histogram metrics + child5 := aggHistogram.AddChild("default") + child5.RecordValue(100) + child5.RecordValue(200) + child6 := aggHistogram.AddChild("user") + child6.RecordValue(300) + child6.RecordValue(400) + child6.RecordValue(500) + // Enable the cluster setting for child metrics storage ChildMetricsStorageEnabled.Override(context.Background(), &st.SV, true) // Get time series data with child metrics enabled childData := recorder.GetTimeSeriesData(true) - var foundCounterFeedA123 bool - var foundCounterFeedB456 bool - var foundGaugeMineActive bool - var foundGaugeYoursPaused bool - - for _, ts := range childData { - // Check for changefeed.agg_counter with some_id and feed_id labels - if strings.Contains(ts.Name, "cr.node.changefeed.agg_counter") { - if strings.Contains(ts.Name, "feed_id=\"feed-a\"") && strings.Contains(ts.Name, "some_id=\"123\"") { - foundCounterFeedA123 = true - require.Equal(t, "7", ts.Source, "Expected source to be node ID 7") - require.Len(t, ts.Datapoints, 1) - require.Equal(t, float64(100), ts.Datapoints[0].Value) - } - if strings.Contains(ts.Name, "feed_id=\"feed-b\"") && strings.Contains(ts.Name, "some_id=\"456\"") { - foundCounterFeedB456 = true - require.Equal(t, "7", ts.Source, "Expected source to be node ID 7") - require.Len(t, ts.Datapoints, 1) - require.Equal(t, float64(200), ts.Datapoints[0].Value) - } - } + appMetricTestCases := []struct { + name string + metricPrefix string + labelMatchers []string + expectedSource string + expectedValue float64 + }{ + { + name: "counter feed-a with some_id=123", + metricPrefix: "cr.node.changefeed.emitted_messages", + labelMatchers: []string{"feed_id=\"feed-a\"", "some_id=\"123\""}, + expectedSource: "7", + expectedValue: 100, + }, + { + name: "counter feed-b with some_id=456", + metricPrefix: "cr.node.changefeed.emitted_messages", + labelMatchers: []string{"feed_id=\"feed-b\"", "some_id=\"456\""}, + expectedSource: "7", + expectedValue: 200, + }, + { + name: "gauge mine with status=active", + metricPrefix: "cr.node.changefeed.lagging_ranges", + labelMatchers: []string{"db=\"mine\"", "status__=\"active\""}, + expectedSource: "7", + expectedValue: 1, + }, + { + name: "gauge yours with status=paused", + metricPrefix: "cr.node.changefeed.lagging_ranges", + labelMatchers: []string{"db=\"yours\"", "status__=\"paused\""}, + expectedSource: "7", + expectedValue: 0, + }, + { + name: "histogram default -count", + metricPrefix: "cr.node.changefeed.stage.downstream_client_send.latency", + labelMatchers: []string{"scope=\"default\"", "-count"}, + expectedSource: "7", + expectedValue: 2, + }, + { + name: "histogram user -count", + metricPrefix: "cr.node.changefeed.stage.downstream_client_send.latency", + labelMatchers: []string{"scope=\"user\"", "-count"}, + expectedSource: "7", + expectedValue: 3, + }, + { + name: "histogram default -avg", + metricPrefix: "cr.node.changefeed.stage.downstream_client_send.latency", + labelMatchers: []string{"scope=\"default\"", "-avg"}, + expectedSource: "7", + expectedValue: 150, + }, + { + name: "histogram user -avg", + metricPrefix: "cr.node.changefeed.stage.downstream_client_send.latency", + labelMatchers: []string{"scope=\"user\"", "-avg"}, + expectedSource: "7", + expectedValue: 400, + }, + } - // Check for changefeed.agg_gauge with db and status labels - if strings.Contains(ts.Name, "cr.node.changefeed.agg_gauge") { - if strings.Contains(ts.Name, "db=\"mine\"") && strings.Contains(ts.Name, "status__=\"active\"") { - foundGaugeMineActive = true - require.Equal(t, "7", ts.Source, "Expected source to be node ID 7") - require.Len(t, ts.Datapoints, 1) - require.Equal(t, float64(1), ts.Datapoints[0].Value) + for _, tc := range appMetricTestCases { + var found bool + for _, ts := range childData { + if !strings.Contains(ts.Name, tc.metricPrefix) { + continue + } + allMatch := true + for _, matcher := range tc.labelMatchers { + if !strings.Contains(ts.Name, matcher) { + allMatch = false + break + } } - if strings.Contains(ts.Name, "db=\"yours\"") && strings.Contains(ts.Name, "status__=\"paused\"") { - foundGaugeYoursPaused = true - require.Equal(t, "7", ts.Source, "Expected source to be node ID 7") - require.Len(t, ts.Datapoints, 1) - require.Equal(t, float64(0), ts.Datapoints[0].Value) + if !allMatch { + continue } + found = true + require.Equal(t, tc.expectedSource, ts.Source, "Expected source for %s", tc.name) + require.Len(t, ts.Datapoints, 1, "Expected 1 datapoint for %s", tc.name) + require.Equal(t, tc.expectedValue, ts.Datapoints[0].Value, "Expected value for %s", tc.name) + break } + require.True(t, found, "Expected to find %s", tc.name) } - require.True(t, foundCounterFeedA123, "Expected to find changefeed.agg_counter with some_id=123 and feed_id=feed-a") - require.True(t, foundCounterFeedB456, "Expected to find changefeed.agg_counter with some_id=456 and feed_id=feed-b") - require.True(t, foundGaugeMineActive, "Expected to find changefeed.agg_gauge with db=mine and status=active") - require.True(t, foundGaugeYoursPaused, "Expected to find changefeed.agg_gauge with db=yours and status=paused") - // ======================================== // Verify that tenant changefeed child metrics are collected with proper source // ======================================== @@ -367,7 +435,7 @@ func TestMetricsRecorderLabels(t *testing.T) { tenantAggGauge := aggmetric.NewGauge( metric.Metadata{ - Name: "changefeed.running", + Name: "changefeed.backfill_pending_ranges", TsdbRecordLabeled: &tsdbRecordLabeled, Category: metric.Metadata_CHANGEFEEDS, }, @@ -383,48 +451,68 @@ func TestMetricsRecorderLabels(t *testing.T) { // Get time series data with child metrics enabled tenantChildData := recorder.GetTimeSeriesData(true) - var foundTenantCounterFeed1 bool - var foundTenantCounterFeed2 bool - var foundTenantGaugeDefault bool - var foundTenantGaugeUser bool - - for _, ts := range tenantChildData { - // Verify tenant changefeed metrics have correct source (7-123 for node 7, tenant 123) - if strings.Contains(ts.Name, "cr.node.changefeed.emitted_messages") { - if strings.Contains(ts.Name, `scope="default"`) && strings.Contains(ts.Name, `feed_id="tenant-feed-1"`) { - foundTenantCounterFeed1 = true - require.Equal(t, "7-123", ts.Source, "Expected tenant source format node-tenant") - require.Len(t, ts.Datapoints, 1) - require.Equal(t, float64(500), ts.Datapoints[0].Value) - } - if strings.Contains(ts.Name, `scope="user"`) && strings.Contains(ts.Name, `feed_id="tenant-feed-2"`) { - foundTenantCounterFeed2 = true - require.Equal(t, "7-123", ts.Source, "Expected tenant source format node-tenant") - require.Len(t, ts.Datapoints, 1) - require.Equal(t, float64(750), ts.Datapoints[0].Value) - } - } + tenantMetricTestCases := []struct { + name string + metricPrefix string + labelMatchers []string + expectedSource string + expectedValue float64 + }{ + { + name: "tenant counter default with tenant-feed-1", + metricPrefix: "cr.node.changefeed.emitted_messages", + labelMatchers: []string{`scope="default"`, `feed_id="tenant-feed-1"`}, + expectedSource: "7-123", + expectedValue: 500, + }, + { + name: "tenant counter user with tenant-feed-2", + metricPrefix: "cr.node.changefeed.emitted_messages", + labelMatchers: []string{`scope="user"`, `feed_id="tenant-feed-2"`}, + expectedSource: "7-123", + expectedValue: 750, + }, + { + name: "tenant gauge default", + metricPrefix: "cr.node.changefeed.backfill_pending_ranges", + labelMatchers: []string{`scope="default"`}, + expectedSource: "7-123", + expectedValue: 2, + }, + { + name: "tenant gauge user", + metricPrefix: "cr.node.changefeed.backfill_pending_ranges", + labelMatchers: []string{`scope="user"`}, + expectedSource: "7-123", + expectedValue: 1, + }, + } - if strings.Contains(ts.Name, "cr.node.changefeed.running") { - if strings.Contains(ts.Name, `scope="default"`) { - foundTenantGaugeDefault = true - require.Equal(t, "7-123", ts.Source, "Expected tenant source format node-tenant") - require.Len(t, ts.Datapoints, 1) - require.Equal(t, float64(2), ts.Datapoints[0].Value) + for _, tc := range tenantMetricTestCases { + var found bool + for _, ts := range tenantChildData { + if !strings.Contains(ts.Name, tc.metricPrefix) { + continue + } + // Check if all label matchers are present + allMatch := true + for _, matcher := range tc.labelMatchers { + if !strings.Contains(ts.Name, matcher) { + allMatch = false + break + } } - if strings.Contains(ts.Name, `scope="user"`) { - foundTenantGaugeUser = true - require.Equal(t, "7-123", ts.Source, "Expected tenant source format node-tenant") - require.Len(t, ts.Datapoints, 1) - require.Equal(t, float64(1), ts.Datapoints[0].Value) + if !allMatch { + continue } + found = true + require.Equal(t, tc.expectedSource, ts.Source, "Expected source for %s", tc.name) + require.Len(t, ts.Datapoints, 1, "Expected 1 datapoint for %s", tc.name) + require.Equal(t, tc.expectedValue, ts.Datapoints[0].Value, "Expected value for %s", tc.name) + break } + require.True(t, found, "Expected to find %s", tc.name) } - - require.True(t, foundTenantCounterFeed1, "Expected to find tenant changefeed.emitted_messages with scope=default and feed_id=tenant-feed-1") - require.True(t, foundTenantCounterFeed2, "Expected to find tenant changefeed.emitted_messages with scope=user and feed_id=tenant-feed-2") - require.True(t, foundTenantGaugeDefault, "Expected to find tenant changefeed.running with scope=default") - require.True(t, foundTenantGaugeUser, "Expected to find tenant changefeed.running with scope=user") } func TestRegistryRecorder_RecordChild(t *testing.T) { @@ -947,7 +1035,7 @@ func TestRecordChangefeedChildMetrics(t *testing.T) { } var dest []tspb.TimeSeriesData - recorder.recordChangefeedChildMetrics(&dest) + recorder.recordChangefeedChildMetrics(&dest, nil) require.Empty(t, dest) }) @@ -972,7 +1060,7 @@ func TestRecordChangefeedChildMetrics(t *testing.T) { } var dest []tspb.TimeSeriesData - recorder.recordChangefeedChildMetrics(&dest) + recorder.recordChangefeedChildMetrics(&dest, nil) require.Empty(t, dest) }) @@ -984,7 +1072,7 @@ func TestRecordChangefeedChildMetrics(t *testing.T) { tsdbRecordLabeled := false gauge := aggmetric.NewGauge( metric.Metadata{ - Name: "changefeed.test_metric", + Name: "changefeed.error_retries", TsdbRecordLabeled: &tsdbRecordLabeled, Category: metric.Metadata_CHANGEFEEDS, }, @@ -1004,7 +1092,7 @@ func TestRecordChangefeedChildMetrics(t *testing.T) { } var dest []tspb.TimeSeriesData - recorder.recordChangefeedChildMetrics(&dest) + recorder.recordChangefeedChildMetrics(&dest, nil) // Should be empty since TsdbRecordLabeled is false require.Empty(t, dest) @@ -1013,10 +1101,9 @@ func TestRecordChangefeedChildMetrics(t *testing.T) { t.Run("changefeed aggmetrics with child collection", func(t *testing.T) { reg := metric.NewRegistry() - // Create an aggmetric which supports child metrics - // Note: aggmetrics automatically have child collection capabilities + // Create an aggmetric which supports child metrics and is in the allowed list gauge := aggmetric.NewGauge(metric.Metadata{ - Name: "changefeed.test_metric", + Name: "changefeed.max_behind_nanos", Category: metric.Metadata_CHANGEFEEDS, }, "job_id", "feed_id") reg.AddMetric(gauge) @@ -1036,14 +1123,14 @@ func TestRecordChangefeedChildMetrics(t *testing.T) { } var dest []tspb.TimeSeriesData - recorder.recordChangefeedChildMetrics(&dest) + recorder.recordChangefeedChildMetrics(&dest, nil) - // Should have data for child metrics (aggmetrics have child collection enabled by default) + // Should have data for child metrics (TsdbRecordLabeled is defaulted true and metric is in allowed list) require.Len(t, dest, 2) // Verify the data structure for _, data := range dest { - require.Contains(t, data.Name, "changefeed.test_metric") + require.Contains(t, data.Name, "changefeed.max_behind_nanos") require.Equal(t, "test-source", data.Source) require.Len(t, data.Datapoints, 1) require.Equal(t, manual.Now().UnixNano(), data.Datapoints[0].TimestampNanos) @@ -1055,7 +1142,7 @@ func TestRecordChangefeedChildMetrics(t *testing.T) { reg := metric.NewRegistry() gauge := aggmetric.NewGauge(metric.Metadata{ - Name: "changefeed.high_cardinality_metric", + Name: "changefeed.total_ranges", Category: metric.Metadata_CHANGEFEEDS, }, "job_id") reg.AddMetric(gauge) @@ -1074,7 +1161,7 @@ func TestRecordChangefeedChildMetrics(t *testing.T) { } var dest []tspb.TimeSeriesData - recorder.recordChangefeedChildMetrics(&dest) + recorder.recordChangefeedChildMetrics(&dest, nil) // Should be limited to 1024 child metrics require.Len(t, dest, 1024) @@ -1084,7 +1171,7 @@ func TestRecordChangefeedChildMetrics(t *testing.T) { reg := metric.NewRegistry() gauge := aggmetric.NewGauge(metric.Metadata{ - Name: "changefeed.label_test", + Name: "changefeed.aggregator_progress", Category: metric.Metadata_CHANGEFEEDS, }, "job-id", "feed.name") reg.AddMetric(gauge) @@ -1101,21 +1188,21 @@ func TestRecordChangefeedChildMetrics(t *testing.T) { } var dest []tspb.TimeSeriesData - recorder.recordChangefeedChildMetrics(&dest) + recorder.recordChangefeedChildMetrics(&dest, nil) require.Len(t, dest, 1) // Verify that the metric name contains sanitized labels metricName := dest[0].Name - require.Contains(t, metricName, "changefeed.label_test{feed_name=\"my-feed!\", job_id=\"test@123\"}") + require.Contains(t, metricName, "changefeed.aggregator_progress{feed_name=\"my-feed!\", job_id=\"test@123\"}") }) - t.Run("counter and gauge support", func(t *testing.T) { + t.Run("counter, gauge, histogram support", func(t *testing.T) { reg := metric.NewRegistry() // Test with gauge gauge := aggmetric.NewGauge(metric.Metadata{ - Name: "changefeed.gauge_metric", + Name: "changefeed.checkpoint_progress", Category: metric.Metadata_CHANGEFEEDS, }, "type") reg.AddMetric(gauge) @@ -1124,13 +1211,32 @@ func TestRecordChangefeedChildMetrics(t *testing.T) { // Test with counter counter := aggmetric.NewCounter(metric.Metadata{ - Name: "changefeed.counter_metric", + Name: "changefeed.internal_retry_message_count", Category: metric.Metadata_CHANGEFEEDS, }, "type") reg.AddMetric(counter) counterChild := counter.AddChild("counter") counterChild.Inc(50) + // Test with histogram + histogram := aggmetric.NewHistogram( + metric.HistogramOptions{ + Metadata: metric.Metadata{ + Name: "changefeed.emitted_batch_sizes", + Category: metric.Metadata_CHANGEFEEDS, + }, + Duration: 10 * time.Second, + BucketConfig: metric.IOLatencyBuckets, + Mode: metric.HistogramModePrometheus, + }, + "scope", + ) + reg.AddMetric(histogram) + histChild := histogram.AddChild("default") + histChild.RecordValue(100) + histChild.RecordValue(200) + histChild.RecordValue(300) + recorder := registryRecorder{ registry: reg, format: nodeTimeSeriesPrefix, @@ -1139,9 +1245,9 @@ func TestRecordChangefeedChildMetrics(t *testing.T) { } var dest []tspb.TimeSeriesData - recorder.recordChangefeedChildMetrics(&dest) + recorder.recordChangefeedChildMetrics(&dest, nil) - require.Len(t, dest, 2) + require.Len(t, dest, 13) // 1 gauge + 1 counter + 11 histogram metrics // Find gauge and counter data points var gaugeValue, counterValue float64 @@ -1155,5 +1261,68 @@ func TestRecordChangefeedChildMetrics(t *testing.T) { require.Equal(t, float64(100), gaugeValue) require.Equal(t, float64(50), counterValue) + + // Verify that histogram suffixes are present + foundSuffixes := make(map[string]bool) + expectedSuffixes := []string{"-count", "-sum", "-p50", "-p75", "-p90", "-p99", "-p99.9", "-p99.99", "-p99.999", "-max", "-avg"} + + for _, data := range dest { + for _, suffix := range expectedSuffixes { + if strings.Contains(data.Name, suffix) { + foundSuffixes[suffix] = true + } + } + } + + require.Equal(t, len(expectedSuffixes), len(foundSuffixes), "Expected to find histogram metric suffixes") }) } + +func BenchmarkRecordChangefeedChildMetrics(b *testing.B) { + manual := timeutil.NewManualTime(timeutil.Unix(0, 100)) + enableChildCollection := true + + // Get metrics from the allowed list and convert to slice for indexing + allowedMetricsList := make([]string, 0, len(allowedChangefeedMetrics)) + for metricName := range allowedChangefeedMetrics { + allowedMetricsList = append(allowedMetricsList, metricName) + } + + // Benchmark with varying numbers of child metrics + for childCount := 10; childCount <= 1024; childCount *= 10 { + b.Run(fmt.Sprintf("children-%d", childCount), func(b *testing.B) { + reg := metric.NewRegistry() + + // Create a single gauge with varying numbers of children + gauge := aggmetric.NewGauge( + metric.Metadata{ + Name: allowedMetricsList[0], + TsdbRecordLabeled: &enableChildCollection, + Category: metric.Metadata_CHANGEFEEDS, + }, + "job_id", + ) + reg.AddMetric(gauge) + + for i := 0; i < childCount; i++ { + child := gauge.AddChild(strconv.Itoa(i)) + child.Update(int64(rand.Intn(1000))) + } + + recorder := registryRecorder{ + registry: reg, + format: nodeTimeSeriesPrefix, + source: "test-source", + timestampNanos: manual.Now().UnixNano(), + } + + b.ResetTimer() + b.ReportAllocs() + + for n := 0; n < b.N; n++ { + var dest []tspb.TimeSeriesData + recorder.recordChangefeedChildMetrics(&dest, nil) + } + }) + } +} diff --git a/pkg/util/metric/aggmetric/histogram.go b/pkg/util/metric/aggmetric/histogram.go index 25450f0e7e68..4c0f92ee6784 100644 --- a/pkg/util/metric/aggmetric/histogram.go +++ b/pkg/util/metric/aggmetric/histogram.go @@ -153,6 +153,18 @@ func (a *AggHistogram) AddChild(labelVals ...string) *Histogram { return child } +// EachChild iterates over all child histograms, calling the provided function for each one. +// This allows direct access to child histogram objects and their individual snapshots. +// The labelNames parameter will be set to the label names configured for this histogram. +func (a *AggHistogram) EachChild(f func(labelNames, labelVals []string, child *Histogram)) { + labelNames := a.labels + a.apply(func(item MetricItem) { + if h, ok := item.(*Histogram); ok { + f(labelNames, h.labelValues(), h) + } + }) +} + // Histogram is a child of a AggHistogram. When values are recorded, so too is the // parent. When metrics are collected by prometheus, each of the children will // appear with a distinct label, however, when cockroach internally collects @@ -188,6 +200,24 @@ func (g *Histogram) RecordValue(v int64) { g.parent.h.RecordValue(v) } +// CumulativeSnapshot returns the cumulative snapshot for this child histogram. +// Returns an empty snapshot if the underlying histogram doesn't implement CumulativeHistogram. +func (g *Histogram) CumulativeSnapshot() metric.HistogramSnapshot { + if ch, ok := g.h.(metric.CumulativeHistogram); ok { + return ch.CumulativeSnapshot() + } + return metric.HistogramSnapshot{} +} + +// WindowedSnapshot returns the windowed snapshot for this child histogram. +// Returns an empty snapshot if the underlying histogram doesn't implement WindowedHistogram. +func (g *Histogram) WindowedSnapshot() metric.HistogramSnapshot { + if wh, ok := g.h.(metric.WindowedHistogram); ok { + return wh.WindowedSnapshot() + } + return metric.HistogramSnapshot{} +} + // SQLHistogram maintains a histogram as the sum of its children. The histogram will // report to crdb-internal time series only the aggregate sum of all of its // children, while its children are additionally exported to prometheus via the