Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion internal/reaper/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,23 @@ func (r *Reaper) LoadSources(ctx context.Context) (err error) {
newSrcs[i] = md
continue
}
if md.IsSameConnection(newMD.Source) {
// same connection config, then update the existing struct with new metrics and tags
r.logger.WithField("source", md.Name).Info("Metric OR Tag configs changed, updating without full restart")

// lock to ensure data integrity
md.Lock()
md.Metrics = newMD.Metrics
md.MetricsStandby = newMD.MetricsStandby
md.PresetMetrics = newMD.PresetMetrics
md.PresetMetricsStandby = newMD.PresetMetricsStandby
md.CustomTags = newMD.CustomTags
md.Unlock()
newSrcs[i] = md
continue
}

// Source configs changed, stop all running gatherers to trigger a restart
// TODO: Optimize this for single metric addition/deletion/interval-change cases to not do a full restart
r.logger.WithField("source", md.Name).Info("Source configs changed, restarting all gatherers...")
r.ShutdownOldWorkers(ctx, map[string]bool{md.Name: true})
}
Expand Down
178 changes: 131 additions & 47 deletions internal/reaper/reaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestReaper_LoadSources(t *testing.T) {
assert.Equal(t, 5, len(r.monitoredSources), "Expected five monitored sources after resetting groups")
})

t.Run("Test source config changes trigger restart", func(t *testing.T) {
t.Run("Test source connection config changes trigger restart", func(t *testing.T) {
baseSource := sources.Source{
Name: "TestSource",
IsEnabled: true,
Expand All @@ -115,108 +115,180 @@ func TestReaper_LoadSources(t *testing.T) {
testCases := []struct {
name string
modifySource func(s *sources.Source)
expectCancel bool
}{
{
name: "connection string change",
modifySource: func(s *sources.Source) {
s.ConnStr = "postgres://localhost:5433/newdb"
},
},
{
name: "group change",
modifySource: func(s *sources.Source) {
s.Group = "new-group"
},
},
{
name: "kind change",
modifySource: func(s *sources.Source) {
s.Kind = sources.SourcePgBouncer
},
},
{
name: "only if master change",
modifySource: func(s *sources.Source) {
s.OnlyIfMaster = true
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
initialSource := *baseSource.Clone()
initialReader := &testutil.MockSourcesReaderWriter{
GetSourcesFunc: func() (sources.Sources, error) {
return sources.Sources{initialSource}, nil
},
}

r := NewReaper(ctx, &cmdopts.Options{
SourcesReaderWriter: initialReader,
SinksWriter: &sinks.MultiWriter{},
})
assert.NoError(t, r.LoadSources(ctx))
assert.Equal(t, 1, len(r.monitoredSources), "Expected one monitored source after initial load")

mockConn, err := pgxmock.NewPool()
require.NoError(t, err)
mockConn.ExpectClose()
r.monitoredSources[0].Conn = mockConn

// Add a mock cancel function for a metric gatherer
cancelCalled := make(map[string]bool)
for metric := range initialSource.Metrics {
dbMetric := initialSource.Name + "¤¤¤" + metric
r.cancelFuncs[dbMetric] = func() {
cancelCalled[dbMetric] = true
}
}

// Create modified source
modifiedSource := *baseSource.Clone()
tc.modifySource(&modifiedSource)

modifiedReader := &testutil.MockSourcesReaderWriter{
GetSourcesFunc: func() (sources.Sources, error) {
return sources.Sources{modifiedSource}, nil
},
}
r.SourcesReaderWriter = modifiedReader

// Reload sources
assert.NoError(t, r.LoadSources(ctx))
assert.Equal(t, 1, len(r.monitoredSources), "Expected one monitored source after reload")
assert.Equal(t, modifiedSource, r.monitoredSources[0].Source)

for metric := range initialSource.Metrics {
dbMetric := initialSource.Name + "¤¤¤" + metric
// Since it is a connection config change, the cancel function should be called for each metric
assert.True(t, cancelCalled[dbMetric])

assert.Nil(t, mockConn.ExpectationsWereMet(), "Expected all mock expectations to be met")
_, exists := r.cancelFuncs[dbMetric]
assert.False(t, exists, "Expected cancel func to be removed from map after cancellation")
}
})
}
})

t.Run("Test metric or tag config changes trigger update", func(t *testing.T) {
baseSource := sources.Source{
Name: "TestSource",
IsEnabled: true,
Kind: sources.SourcePostgres,
ConnStr: "postgres://localhost:5432/testdb",
Metrics: map[string]float64{"cpu": 10, "memory": 20},
MetricsStandby: map[string]float64{"cpu": 30},
CustomTags: map[string]string{"env": "test"},
Group: "default",
}

testCases := []struct {
name string
modifySource func(s *sources.Source)
expectedCancelled []string // List of metrics that shold be stopped
}{
{
name: "custom tags change",
modifySource: func(s *sources.Source) {
s.CustomTags = map[string]string{"env": "production"}
},
expectCancel: true,
expectedCancelled: nil, // No metrics should be cancelled, just updated with new tags
},
{
name: "custom tags add new tag",
modifySource: func(s *sources.Source) {
s.CustomTags = map[string]string{"env": "test", "region": "us-east"}
},
expectCancel: true,
expectedCancelled: nil, // No metrics should be cancelled, just updated with new tags
},
{
name: "custom tags remove tag",
modifySource: func(s *sources.Source) {
s.CustomTags = map[string]string{}
},
expectCancel: true,
expectedCancelled: nil, // No metrics should be cancelled, just updated with new tags
},
{
name: "preset metrics change",
modifySource: func(s *sources.Source) {
s.PresetMetrics = "exhaustive"
},
expectCancel: true,
expectedCancelled: nil, // No metrics should be cancelled, just updated with new preset that will be applied on next iteration
},
{
name: "preset standby metrics change",
modifySource: func(s *sources.Source) {
s.PresetMetricsStandby = "standby-preset"
},
expectCancel: true,
},
{
name: "connection string change",
modifySource: func(s *sources.Source) {
s.ConnStr = "postgres://localhost:5433/newdb"
},
expectCancel: true,
expectedCancelled: nil, // No metrics should be cancelled, just updated with new preset that will be applied on next iteration
},
{
name: "custom metrics change interval",
modifySource: func(s *sources.Source) {
s.Metrics = map[string]float64{"cpu": 15, "memory": 20}
},
expectCancel: true,
expectedCancelled: nil, // No metrics should be cancelled, just updated with new interval for cpu metric
},
{
name: "custom metrics add new metric",
modifySource: func(s *sources.Source) {
s.Metrics = map[string]float64{"cpu": 10, "memory": 20, "disk": 30}
},
expectCancel: true,
expectedCancelled: nil, // No metrics should be cancelled, just updated with new metric
},
{
name: "custom metrics remove metric",
modifySource: func(s *sources.Source) {
s.Metrics = map[string]float64{"cpu": 10}
},
expectCancel: true,
expectedCancelled: []string{"memory"}, // Only memory metric should be cancelled
},
{
name: "standby metrics change",
modifySource: func(s *sources.Source) {
s.MetricsStandby = map[string]float64{"cpu": 60}
},
expectCancel: true,
},
{
name: "group change",
modifySource: func(s *sources.Source) {
s.Group = "new-group"
},
expectCancel: true,
},
{
name: "kind change",
modifySource: func(s *sources.Source) {
s.Kind = sources.SourcePgBouncer
},
expectCancel: true,
},
{
name: "only if master change",
modifySource: func(s *sources.Source) {
s.OnlyIfMaster = true
},
expectCancel: true,
expectedCancelled: nil, // No metrics should be cancelled, just updated with new standby metrics
},
{
name: "no change - same config",
modifySource: func(_ *sources.Source) {
// No modifications - source stays the same
},
expectCancel: false,
expectedCancelled: nil, // No metrics should be cancelled
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
initialSource := *baseSource.Clone()
Expand All @@ -235,7 +307,8 @@ func TestReaper_LoadSources(t *testing.T) {

mockConn, err := pgxmock.NewPool()
require.NoError(t, err)
mockConn.ExpectClose()

// no close expectation set on mockConn here because we should reuse the same connecrion for metric or tag config changes
r.monitoredSources[0].Conn = mockConn

// Add a mock cancel function for a metric gatherer
Expand All @@ -247,7 +320,7 @@ func TestReaper_LoadSources(t *testing.T) {
}
}

// Create modified source
// create modified source
modifiedSource := *baseSource.Clone()
tc.modifySource(&modifiedSource)

Expand All @@ -258,16 +331,27 @@ func TestReaper_LoadSources(t *testing.T) {
}
r.SourcesReaderWriter = modifiedReader

// Reload sources
// reload sources
assert.NoError(t, r.LoadSources(ctx))
// call ShutdownOldWorkers to simulate actual behavior and ensure cancel functions are called
r.ShutdownOldWorkers(ctx, nil)

assert.Equal(t, 1, len(r.monitoredSources), "Expected one monitored source after reload")
assert.Equal(t, modifiedSource, r.monitoredSources[0].Source)

for metric := range initialSource.Metrics {
dbMetric := initialSource.Name + "¤¤¤" + metric
assert.Equal(t, tc.expectCancel, cancelCalled[dbMetric])
if tc.expectCancel {
assert.Nil(t, mockConn.ExpectationsWereMet(), "Expected all mock expectations to be met")

shouldBeCancelled := false
for _, m := range tc.expectedCancelled {
if m == metric {
shouldBeCancelled = true
break
}
}
assert.Equal(t, shouldBeCancelled, cancelCalled[dbMetric])
assert.Nil(t, mockConn.ExpectationsWereMet(), "Expected all mock expectations to be met")
if shouldBeCancelled {
_, exists := r.cancelFuncs[dbMetric]
assert.False(t, exists, "Expected cancel func to be removed from map after cancellation")
}
Expand Down
12 changes: 12 additions & 0 deletions internal/sources/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,18 @@ func (s *Source) Clone() *Source {
return c
}

// to check if connection configs are the same, then update the metrics and tags without a full restart
func (s Source) IsSameConnection(s2 Source) bool {
return s.Name == s2.Name &&
s.Group == s2.Group &&
s.ConnStr == s2.ConnStr &&
s.Kind == s2.Kind &&
s.IsEnabled == s2.IsEnabled &&
s.IncludePattern == s2.IncludePattern &&
s.ExcludePattern == s2.ExcludePattern &&
s.OnlyIfMaster == s2.OnlyIfMaster
}

type Reader interface {
GetSources() (Sources, error)
}
Expand Down
Loading