Skip to content

Commit ecb3482

Browse files
authored
[Feature] Reconciliation Loop Interval option (#1395)
1 parent 9ad1893 commit ecb3482

File tree

9 files changed

+175
-1
lines changed

9 files changed

+175
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
- (Bugfix) Fix CRD yaml (chart)
1212
- (Bugfix) (EE) Fix MemberMaintenance Context and ClusterMaintenance discovery
1313
- (Feature) Add proper Prometheus endpoint compression + 204 response code
14+
- (Feature) Reconciliation Loop Interval option
1415

1516
## [1.2.32](https://github.com/arangodb/kube-arangodb/tree/1.2.32) (2023-08-07)
1617
- (Feature) Backup lifetime - remove Backup once its lifetime has been reached

cmd/cmd.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ var (
136136

137137
alpineImage, metricsExporterImage, arangoImage string
138138

139+
reconciliationDelay time.Duration
140+
139141
singleMode bool
140142
scope string
141143
}
@@ -224,6 +226,7 @@ func init() {
224226
f.DurationVar(&shutdownOptions.delay, "shutdown.delay", defaultShutdownDelay, "The delay before running shutdown handlers")
225227
f.DurationVar(&shutdownOptions.timeout, "shutdown.timeout", defaultShutdownTimeout, "Timeout for shutdown handlers")
226228
f.BoolVar(&operatorOptions.scalingIntegrationEnabled, "internal.scaling-integration", false, "Enable Scaling Integration")
229+
f.DurationVar(&operatorOptions.reconciliationDelay, "reconciliation.delay", 0, "Delay between reconciliation loops (<= 0 -> Disabled)")
227230
f.Int64Var(&operatorKubernetesOptions.maxBatchSize, "kubernetes.max-batch-size", globals.DefaultKubernetesRequestBatchSize, "Size of batch during objects read")
228231
f.Float32Var(&operatorKubernetesOptions.qps, "kubernetes.qps", kclient.DefaultQPS, "Number of queries per second for k8s API")
229232
f.IntVar(&operatorKubernetesOptions.burst, "kubernetes.burst", kclient.DefaultBurst, "Burst for the k8s API")
@@ -530,6 +533,7 @@ func newOperatorConfigAndDeps(id, namespace, name string) (operator.Config, oper
530533
ArangoImage: operatorOptions.arangoImage,
531534
SingleMode: operatorOptions.singleMode,
532535
Scope: scope,
536+
ReconciliationDelay: operatorOptions.reconciliationDelay,
533537
ShutdownDelay: shutdownOptions.delay,
534538
ShutdownTimeout: shutdownOptions.timeout,
535539
}

pkg/deployment/deployment.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ import (
6060
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
6161
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/kerrors"
6262
"github.com/arangodb/kube-arangodb/pkg/util/kclient"
63+
"github.com/arangodb/kube-arangodb/pkg/util/timer"
6364
"github.com/arangodb/kube-arangodb/pkg/util/trigger"
6465
)
6566

@@ -70,6 +71,7 @@ type Config struct {
7071
ScalingIntegrationEnabled bool
7172
OperatorImage string
7273
ArangoImage string
74+
ReconciliationDelay time.Duration
7375
Scope scope.Scope
7476
}
7577

@@ -107,6 +109,8 @@ type Deployment struct {
107109
uid types.UID
108110
namespace string
109111

112+
delayer timer.Delayer
113+
110114
currentObject *api.ArangoDeployment
111115
currentObjectStatus *api.DeploymentStatus
112116
currentObjectLock sync.RWMutex
@@ -256,6 +260,7 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De
256260
stopCh: make(chan struct{}),
257261
agencyCache: agency.NewCache(apiObject.GetNamespace(), apiObject.GetName(), apiObject.GetAcceptedSpec().Mode),
258262
acs: acs.NewACS(apiObject.GetUID(), i),
263+
delayer: timer.NewDelayer(),
259264
}
260265

261266
d.log = logger.WrapObj(d)

pkg/deployment/deployment_inspector.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import (
2424
"context"
2525
"time"
2626

27+
"github.com/rs/zerolog/log"
28+
2729
"github.com/arangodb/kube-arangodb/pkg/apis/deployment"
2830
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
2931
"github.com/arangodb/kube-arangodb/pkg/deployment/features"
@@ -51,6 +53,11 @@ var (
5153
func (d *Deployment) inspectDeployment(lastInterval util.Interval) util.Interval {
5254
start := time.Now()
5355

56+
if delay := d.delayer.Wait(); delay > 0 {
57+
log.Info().Dur("delay", delay).Msgf("Reconciliation loop execution was delayed")
58+
}
59+
defer d.delayer.Delay(d.config.ReconciliationDelay)
60+
5461
ctxReconciliation, cancelReconciliation := globals.GetGlobalTimeouts().Reconciliation().WithTimeout(context.Background())
5562
defer cancelReconciliation()
5663
defer func() {

pkg/operator/operator.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ type Config struct {
106106
ScalingIntegrationEnabled bool
107107
SingleMode bool
108108
Scope scope.Scope
109+
ReconciliationDelay time.Duration
109110
ShutdownDelay time.Duration
110111
ShutdownTimeout time.Duration
111112
}

pkg/operator/operator_deployment.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ func (o *Operator) makeDeploymentConfigAndDeps() (deployment.Config, deployment.
201201
ArangoImage: o.ArangoImage,
202202
AllowChaos: o.Config.AllowChaos,
203203
ScalingIntegrationEnabled: o.Config.ScalingIntegrationEnabled,
204+
ReconciliationDelay: o.Config.ReconciliationDelay,
204205
Scope: o.Scope,
205206
}
206207
deps := deployment.Dependencies{

pkg/util/k8sutil/pods.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//
22
// DISCLAIMER
33
//
4-
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
4+
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
55
//
66
// Licensed under the Apache License, Version 2.0 (the "License");
77
// you may not use this file except in compliance with the License.

pkg/util/timer/delayer.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
//
2+
// DISCLAIMER
3+
//
4+
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
//
18+
// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
//
20+
21+
package timer
22+
23+
import (
24+
"sync"
25+
"time"
26+
)
27+
28+
func NewDelayer() Delayer {
29+
return &delayer{}
30+
}
31+
32+
type Delayer interface {
33+
Delay(delay time.Duration)
34+
35+
Wait() time.Duration
36+
37+
Copy() Delayer
38+
}
39+
40+
type delayer struct {
41+
lock sync.Mutex
42+
43+
last, next time.Time
44+
}
45+
46+
func (d *delayer) Copy() Delayer {
47+
d.lock.Lock()
48+
defer d.lock.Unlock()
49+
50+
return &delayer{
51+
last: d.last,
52+
next: d.next,
53+
}
54+
}
55+
56+
func (d *delayer) Wait() time.Duration {
57+
d.lock.Lock()
58+
defer d.lock.Unlock()
59+
60+
since := time.Until(d.next)
61+
62+
if since <= time.Millisecond {
63+
return 0
64+
}
65+
66+
time.Sleep(since)
67+
68+
return since
69+
}
70+
71+
func (d *delayer) Delay(delay time.Duration) {
72+
d.lock.Lock()
73+
defer d.lock.Unlock()
74+
75+
d.last = time.Now()
76+
d.next = d.last.Add(delay)
77+
}

pkg/util/timer/delayer_test.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
//
2+
// DISCLAIMER
3+
//
4+
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
//
18+
// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
//
20+
21+
package timer
22+
23+
import (
24+
"testing"
25+
"time"
26+
27+
"github.com/stretchr/testify/require"
28+
)
29+
30+
func withTime(f func()) time.Duration {
31+
now := time.Now()
32+
f()
33+
return time.Since(now)
34+
}
35+
36+
func Test_Delayer(t *testing.T) {
37+
d := NewDelayer()
38+
39+
t.Run("Ensure instant execution", func(t *testing.T) {
40+
require.True(t, withTime(func() {
41+
d.Wait()
42+
}) < time.Millisecond)
43+
44+
require.True(t, withTime(func() {
45+
d.Wait()
46+
}) < time.Millisecond)
47+
})
48+
49+
t.Run("Delay execution", func(t *testing.T) {
50+
require.True(t, withTime(func() {
51+
d.Delay(50 * time.Millisecond)
52+
d.Wait()
53+
}) >= 50*time.Millisecond)
54+
})
55+
56+
t.Run("Delay execution, but allow multiple ones", func(t *testing.T) {
57+
require.True(t, withTime(func() {
58+
d.Delay(50 * time.Millisecond)
59+
d.Wait()
60+
d.Wait()
61+
d.Wait()
62+
d.Wait()
63+
}) >= 50*time.Millisecond)
64+
})
65+
66+
t.Run("Delay execution multiple times", func(t *testing.T) {
67+
require.True(t, withTime(func() {
68+
d.Delay(50 * time.Millisecond)
69+
d.Wait()
70+
d.Delay(50 * time.Millisecond)
71+
d.Wait()
72+
d.Delay(50 * time.Millisecond)
73+
d.Wait()
74+
d.Delay(50 * time.Millisecond)
75+
d.Wait()
76+
}) >= 200*time.Millisecond)
77+
})
78+
}

0 commit comments

Comments
 (0)