From 5f9cc2c0f485e0e8c8599c71ca1f7b327370b561 Mon Sep 17 00:00:00 2001 From: Nada Shaban Date: Tue, 31 Mar 2026 15:01:29 +0200 Subject: [PATCH] optimize LoadSources --- internal/reaper/reaper.go | 17 +++- internal/reaper/reaper_test.go | 178 ++++++++++++++++++++++++--------- internal/sources/types.go | 12 +++ internal/sources/types_test.go | 152 ++++++++++++++++++++++++++++ 4 files changed, 311 insertions(+), 48 deletions(-) diff --git a/internal/reaper/reaper.go b/internal/reaper/reaper.go index 741775c28d..0cda96af8b 100644 --- a/internal/reaper/reaper.go +++ b/internal/reaper/reaper.go @@ -421,8 +421,23 @@ func (r *Reaper) LoadSources(ctx context.Context) (err error) { newSrcs[i] = md continue } + if md.IsSameConnection(newMD.Source) { + // same connection config, then update the existing struct with new metrics and tags + r.logger.WithField("source", md.Name).Info("Metric OR Tag configs changed, updating without full restart") + + // lock to ensure data integrity + md.Lock() + md.Metrics = newMD.Metrics + md.MetricsStandby = newMD.MetricsStandby + md.PresetMetrics = newMD.PresetMetrics + md.PresetMetricsStandby = newMD.PresetMetricsStandby + md.CustomTags = newMD.CustomTags + md.Unlock() + newSrcs[i] = md + continue + } + // Source configs changed, stop all running gatherers to trigger a restart - // TODO: Optimize this for single metric addition/deletion/interval-change cases to not do a full restart r.logger.WithField("source", md.Name).Info("Source configs changed, restarting all gatherers...") r.ShutdownOldWorkers(ctx, map[string]bool{md.Name: true}) } diff --git a/internal/reaper/reaper_test.go b/internal/reaper/reaper_test.go index 9370422c1f..1b2402a712 100644 --- a/internal/reaper/reaper_test.go +++ b/internal/reaper/reaper_test.go @@ -100,7 +100,7 @@ func TestReaper_LoadSources(t *testing.T) { assert.Equal(t, 5, len(r.monitoredSources), "Expected five monitored sources after resetting groups") }) - t.Run("Test source config changes trigger restart", func(t *testing.T) { + t.Run("Test source connection config changes trigger restart", func(t *testing.T) { baseSource := sources.Source{ Name: "TestSource", IsEnabled: true, @@ -115,108 +115,180 @@ func TestReaper_LoadSources(t *testing.T) { testCases := []struct { name string modifySource func(s *sources.Source) - expectCancel bool + }{ + { + name: "connection string change", + modifySource: func(s *sources.Source) { + s.ConnStr = "postgres://localhost:5433/newdb" + }, + }, + { + name: "group change", + modifySource: func(s *sources.Source) { + s.Group = "new-group" + }, + }, + { + name: "kind change", + modifySource: func(s *sources.Source) { + s.Kind = sources.SourcePgBouncer + }, + }, + { + name: "only if master change", + modifySource: func(s *sources.Source) { + s.OnlyIfMaster = true + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + initialSource := *baseSource.Clone() + initialReader := &testutil.MockSourcesReaderWriter{ + GetSourcesFunc: func() (sources.Sources, error) { + return sources.Sources{initialSource}, nil + }, + } + + r := NewReaper(ctx, &cmdopts.Options{ + SourcesReaderWriter: initialReader, + SinksWriter: &sinks.MultiWriter{}, + }) + assert.NoError(t, r.LoadSources(ctx)) + assert.Equal(t, 1, len(r.monitoredSources), "Expected one monitored source after initial load") + + mockConn, err := pgxmock.NewPool() + require.NoError(t, err) + mockConn.ExpectClose() + r.monitoredSources[0].Conn = mockConn + + // Add a mock cancel function for a metric gatherer + cancelCalled := make(map[string]bool) + for metric := range initialSource.Metrics { + dbMetric := initialSource.Name + "¤¤¤" + metric + r.cancelFuncs[dbMetric] = func() { + cancelCalled[dbMetric] = true + } + } + + // Create modified source + modifiedSource := *baseSource.Clone() + tc.modifySource(&modifiedSource) + + modifiedReader := &testutil.MockSourcesReaderWriter{ + GetSourcesFunc: func() (sources.Sources, error) { + return sources.Sources{modifiedSource}, nil + }, + } + r.SourcesReaderWriter = modifiedReader + + // Reload sources + assert.NoError(t, r.LoadSources(ctx)) + assert.Equal(t, 1, len(r.monitoredSources), "Expected one monitored source after reload") + assert.Equal(t, modifiedSource, r.monitoredSources[0].Source) + + for metric := range initialSource.Metrics { + dbMetric := initialSource.Name + "¤¤¤" + metric + // Since it is a connection config change, the cancel function should be called for each metric + assert.True(t, cancelCalled[dbMetric]) + + assert.Nil(t, mockConn.ExpectationsWereMet(), "Expected all mock expectations to be met") + _, exists := r.cancelFuncs[dbMetric] + assert.False(t, exists, "Expected cancel func to be removed from map after cancellation") + } + }) + } + }) + + t.Run("Test metric or tag config changes trigger update", func(t *testing.T) { + baseSource := sources.Source{ + Name: "TestSource", + IsEnabled: true, + Kind: sources.SourcePostgres, + ConnStr: "postgres://localhost:5432/testdb", + Metrics: map[string]float64{"cpu": 10, "memory": 20}, + MetricsStandby: map[string]float64{"cpu": 30}, + CustomTags: map[string]string{"env": "test"}, + Group: "default", + } + + testCases := []struct { + name string + modifySource func(s *sources.Source) + expectedCancelled []string // List of metrics that shold be stopped }{ { name: "custom tags change", modifySource: func(s *sources.Source) { s.CustomTags = map[string]string{"env": "production"} }, - expectCancel: true, + expectedCancelled: nil, // No metrics should be cancelled, just updated with new tags }, { name: "custom tags add new tag", modifySource: func(s *sources.Source) { s.CustomTags = map[string]string{"env": "test", "region": "us-east"} }, - expectCancel: true, + expectedCancelled: nil, // No metrics should be cancelled, just updated with new tags }, { name: "custom tags remove tag", modifySource: func(s *sources.Source) { s.CustomTags = map[string]string{} }, - expectCancel: true, + expectedCancelled: nil, // No metrics should be cancelled, just updated with new tags }, { name: "preset metrics change", modifySource: func(s *sources.Source) { s.PresetMetrics = "exhaustive" }, - expectCancel: true, + expectedCancelled: nil, // No metrics should be cancelled, just updated with new preset that will be applied on next iteration }, { name: "preset standby metrics change", modifySource: func(s *sources.Source) { s.PresetMetricsStandby = "standby-preset" }, - expectCancel: true, - }, - { - name: "connection string change", - modifySource: func(s *sources.Source) { - s.ConnStr = "postgres://localhost:5433/newdb" - }, - expectCancel: true, + expectedCancelled: nil, // No metrics should be cancelled, just updated with new preset that will be applied on next iteration }, { name: "custom metrics change interval", modifySource: func(s *sources.Source) { s.Metrics = map[string]float64{"cpu": 15, "memory": 20} }, - expectCancel: true, + expectedCancelled: nil, // No metrics should be cancelled, just updated with new interval for cpu metric }, { name: "custom metrics add new metric", modifySource: func(s *sources.Source) { s.Metrics = map[string]float64{"cpu": 10, "memory": 20, "disk": 30} }, - expectCancel: true, + expectedCancelled: nil, // No metrics should be cancelled, just updated with new metric }, { name: "custom metrics remove metric", modifySource: func(s *sources.Source) { s.Metrics = map[string]float64{"cpu": 10} }, - expectCancel: true, + expectedCancelled: []string{"memory"}, // Only memory metric should be cancelled }, { name: "standby metrics change", modifySource: func(s *sources.Source) { s.MetricsStandby = map[string]float64{"cpu": 60} }, - expectCancel: true, - }, - { - name: "group change", - modifySource: func(s *sources.Source) { - s.Group = "new-group" - }, - expectCancel: true, - }, - { - name: "kind change", - modifySource: func(s *sources.Source) { - s.Kind = sources.SourcePgBouncer - }, - expectCancel: true, - }, - { - name: "only if master change", - modifySource: func(s *sources.Source) { - s.OnlyIfMaster = true - }, - expectCancel: true, + expectedCancelled: nil, // No metrics should be cancelled, just updated with new standby metrics }, { name: "no change - same config", modifySource: func(_ *sources.Source) { // No modifications - source stays the same }, - expectCancel: false, + expectedCancelled: nil, // No metrics should be cancelled }, } - for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { initialSource := *baseSource.Clone() @@ -235,7 +307,8 @@ func TestReaper_LoadSources(t *testing.T) { mockConn, err := pgxmock.NewPool() require.NoError(t, err) - mockConn.ExpectClose() + + // no close expectation set on mockConn here because we should reuse the same connecrion for metric or tag config changes r.monitoredSources[0].Conn = mockConn // Add a mock cancel function for a metric gatherer @@ -247,7 +320,7 @@ func TestReaper_LoadSources(t *testing.T) { } } - // Create modified source + // create modified source modifiedSource := *baseSource.Clone() tc.modifySource(&modifiedSource) @@ -258,16 +331,27 @@ func TestReaper_LoadSources(t *testing.T) { } r.SourcesReaderWriter = modifiedReader - // Reload sources + // reload sources assert.NoError(t, r.LoadSources(ctx)) + // call ShutdownOldWorkers to simulate actual behavior and ensure cancel functions are called + r.ShutdownOldWorkers(ctx, nil) + assert.Equal(t, 1, len(r.monitoredSources), "Expected one monitored source after reload") assert.Equal(t, modifiedSource, r.monitoredSources[0].Source) for metric := range initialSource.Metrics { dbMetric := initialSource.Name + "¤¤¤" + metric - assert.Equal(t, tc.expectCancel, cancelCalled[dbMetric]) - if tc.expectCancel { - assert.Nil(t, mockConn.ExpectationsWereMet(), "Expected all mock expectations to be met") + + shouldBeCancelled := false + for _, m := range tc.expectedCancelled { + if m == metric { + shouldBeCancelled = true + break + } + } + assert.Equal(t, shouldBeCancelled, cancelCalled[dbMetric]) + assert.Nil(t, mockConn.ExpectationsWereMet(), "Expected all mock expectations to be met") + if shouldBeCancelled { _, exists := r.cancelFuncs[dbMetric] assert.False(t, exists, "Expected cancel func to be removed from map after cancellation") } diff --git a/internal/sources/types.go b/internal/sources/types.go index 2925907482..737099695e 100644 --- a/internal/sources/types.go +++ b/internal/sources/types.go @@ -121,6 +121,18 @@ func (s *Source) Clone() *Source { return c } +// to check if connection configs are the same, then update the metrics and tags without a full restart +func (s Source) IsSameConnection(s2 Source) bool { + return s.Name == s2.Name && + s.Group == s2.Group && + s.ConnStr == s2.ConnStr && + s.Kind == s2.Kind && + s.IsEnabled == s2.IsEnabled && + s.IncludePattern == s2.IncludePattern && + s.ExcludePattern == s2.ExcludePattern && + s.OnlyIfMaster == s2.OnlyIfMaster +} + type Reader interface { GetSources() (Sources, error) } diff --git a/internal/sources/types_test.go b/internal/sources/types_test.go index ffe57df9f1..6e4e92bcfa 100644 --- a/internal/sources/types_test.go +++ b/internal/sources/types_test.go @@ -118,3 +118,155 @@ func TestSource_Equal(t *testing.T) { assert.False(t, s1.Equal(srcs[3])) assert.False(t, s1.Equal(srcs[4])) } + +func TestSource_IsSameConnection(t *testing.T) { + s1 := sources.Source{ + Name: "test_db", + Group: "default", + ConnStr: "postgres://user:pass@localhost:5432/db", + Kind: sources.SourcePostgres, + IncludePattern: "prod_*", + ExcludePattern: "test_*", + IsEnabled: true, + OnlyIfMaster: false, + Metrics: map[string]float64{"wal": 60}, + MetricsStandby: map[string]float64{"wal": 60}, + PresetMetrics: "basic", + PresetMetricsStandby: "basic", + CustomTags: map[string]string{"env": "prod"}, + } + + srcs := sources.Sources{ + { + // Same connection details and different metrics/presets/tags + Name: "test_db", + Group: "default", + ConnStr: "postgres://user:pass@localhost:5432/db", + Kind: sources.SourcePostgres, + IncludePattern: "prod_*", + ExcludePattern: "test_*", + IsEnabled: true, + OnlyIfMaster: false, + Metrics: map[string]float64{"db_size": 300}, + MetricsStandby: map[string]float64{"db_size": 300}, + PresetMetrics: "exhaustive", + PresetMetricsStandby: "exhaustive", + CustomTags: map[string]string{"env": "staging"}, + }, + { + // Different name "different connection" + Name: "different_name", + Group: "default", + ConnStr: "postgres://user:pass@localhost:5432/db", + Kind: sources.SourcePostgres, + IncludePattern: "prod_*", + ExcludePattern: "test_*", + IsEnabled: true, + OnlyIfMaster: false, + }, + { + // Different group "different connection" + Name: "test_db", + Group: "different_group", + ConnStr: "postgres://user:pass@localhost:5432/db", + Kind: sources.SourcePostgres, + IncludePattern: "prod_*", + ExcludePattern: "test_*", + IsEnabled: true, + OnlyIfMaster: false, + }, + { + // different connection string "different connection" + Name: "test_db", + Group: "default", + ConnStr: "postgres://user:pass@remote:5432/db", + Kind: sources.SourcePostgres, + IncludePattern: "prod_*", + ExcludePattern: "test_*", + IsEnabled: true, + OnlyIfMaster: false, + }, + { + // different kind "different connection" + Name: "test_db", + Group: "default", + ConnStr: "postgres://user:pass@localhost:5432/db", + Kind: sources.SourcePgBouncer, + IncludePattern: "prod_*", + ExcludePattern: "test_*", + IsEnabled: true, + OnlyIfMaster: false, + }, + { + // different include pattern "different connection" + Name: "test_db", + Group: "default", + ConnStr: "postgres://user:pass@localhost:5432/db", + Kind: sources.SourcePostgres, + IncludePattern: "dev_*", + ExcludePattern: "test_*", + IsEnabled: true, + OnlyIfMaster: false, + }, + { + // different exclude pattern "different connection" + Name: "test_db", + Group: "default", + ConnStr: "postgres://user:pass@localhost:5432/db", + Kind: sources.SourcePostgres, + IncludePattern: "prod_*", + ExcludePattern: "ignore_*", + IsEnabled: true, + OnlyIfMaster: false, + }, + { + // different enabled state "different connection" + Name: "test_db", + Group: "default", + ConnStr: "postgres://user:pass@localhost:5432/db", + Kind: sources.SourcePostgres, + IncludePattern: "prod_*", + ExcludePattern: "test_*", + IsEnabled: false, + OnlyIfMaster: false, + }, + { + // different master only state "different connection" + Name: "test_db", + Group: "default", + ConnStr: "postgres://user:pass@localhost:5432/db", + Kind: sources.SourcePostgres, + IncludePattern: "prod_*", + ExcludePattern: "test_*", + IsEnabled: true, + OnlyIfMaster: true, + }, + { + // identical to s1 "same connection" + Name: "test_db", + Group: "default", + ConnStr: "postgres://user:pass@localhost:5432/db", + Kind: sources.SourcePostgres, + IncludePattern: "prod_*", + ExcludePattern: "test_*", + IsEnabled: true, + OnlyIfMaster: false, + Metrics: map[string]float64{"wal": 60}, + MetricsStandby: map[string]float64{"wal": 60}, + PresetMetrics: "basic", + PresetMetricsStandby: "basic", + CustomTags: map[string]string{"env": "prod"}, + }, + } + + assert.True(t, s1.IsSameConnection(srcs[0])) + assert.False(t, s1.IsSameConnection(srcs[1])) + assert.False(t, s1.IsSameConnection(srcs[2])) + assert.False(t, s1.IsSameConnection(srcs[3])) + assert.False(t, s1.IsSameConnection(srcs[4])) + assert.False(t, s1.IsSameConnection(srcs[5])) + assert.False(t, s1.IsSameConnection(srcs[6])) + assert.False(t, s1.IsSameConnection(srcs[7])) + assert.False(t, s1.IsSameConnection(srcs[8])) + assert.True(t, s1.IsSameConnection(srcs[9])) +}