Skip to content

Commit 16d9a54

Browse files
authored
[Feature] Parallel Executor (#1916)
1 parent 90378a5 commit 16d9a54

File tree

5 files changed

+327
-1
lines changed

5 files changed

+327
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## [master](https://github.com/arangodb/kube-arangodb/tree/master) (N/A)
44
- (Feature) (Platform) MetaV1 Integration Service
55
- (Feature) (Platform) Chart Overrides
6+
- (Feature) Parallel Executor
67

78
## [1.2.49](https://github.com/arangodb/kube-arangodb/tree/1.2.49) (2025-06-17)
89
- (Maintenance) Optimize go.mod

pkg/deployment/resources/pod_inspector.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/arangodb/kube-arangodb/pkg/deployment/agency/state"
3636
"github.com/arangodb/kube-arangodb/pkg/deployment/features"
3737
"github.com/arangodb/kube-arangodb/pkg/deployment/patch"
38+
"github.com/arangodb/kube-arangodb/pkg/handlers/utils"
3839
"github.com/arangodb/kube-arangodb/pkg/metrics"
3940
"github.com/arangodb/kube-arangodb/pkg/util"
4041
"github.com/arangodb/kube-arangodb/pkg/util/constants"
@@ -106,7 +107,7 @@ func (r *Resources) InspectPods(ctx context.Context, cachedStatus inspectorInter
106107

107108
spec := r.context.GetSpec()
108109
groupSpec := spec.GetServerGroupSpec(group)
109-
coreContainers := spec.GetCoreContainers(group)
110+
coreContainers := getPodCoreContainers(spec.GetCoreContainers(group), pod.Spec.Containers)
110111

111112
if c, ok := memberStatus.Conditions.Get(api.ConditionTypeUpdating); ok {
112113
if v, ok := c.Params[api.ConditionParamContainerUpdatingName]; ok {
@@ -523,3 +524,15 @@ func removeLabel(labels map[string]string, key string) map[string]string {
523524

524525
return labels
525526
}
527+
528+
func getPodCoreContainers(coreContainers utils.StringList, containers []core.Container) utils.StringList {
529+
r := make(utils.StringList, 0, len(coreContainers))
530+
531+
for _, container := range containers {
532+
if coreContainers.Has(container.Name) {
533+
r = append(r, container.Name)
534+
}
535+
}
536+
537+
return r
538+
}

pkg/util/executor/executor.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
//
2+
// DISCLAIMER
3+
//
4+
// Copyright 2025 ArangoDB GmbH, Cologne, Germany
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
//
18+
// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
//
20+
21+
package executor
22+
23+
import (
24+
"context"
25+
"sync"
26+
"time"
27+
28+
"github.com/rs/zerolog"
29+
30+
"github.com/arangodb/kube-arangodb/pkg/logging"
31+
"github.com/arangodb/kube-arangodb/pkg/util/errors"
32+
)
33+
34+
func Run(ctx context.Context, log logging.Logger, threads int, f RunFunc) error {
35+
h := &handler{
36+
th: NewThreadManager(threads),
37+
completed: make(chan struct{}),
38+
log: log,
39+
}
40+
41+
go h.run(ctx, f)
42+
43+
return h.Wait()
44+
}
45+
46+
type RunFunc func(ctx context.Context, log logging.Logger, t Thread, h Handler) error
47+
48+
type Executor interface {
49+
Completed() bool
50+
51+
Wait() error
52+
}
53+
54+
type Handler interface {
55+
RunAsync(ctx context.Context, f RunFunc) Executor
56+
57+
WaitForSubThreads(t Thread)
58+
}
59+
60+
type handler struct {
61+
lock sync.Mutex
62+
63+
th ThreadManager
64+
65+
handlers []*handler
66+
67+
completed chan struct{}
68+
69+
log logging.Logger
70+
71+
err error
72+
}
73+
74+
func (h *handler) WaitForSubThreads(t Thread) {
75+
for {
76+
t.Release()
77+
78+
if h.subThreadsCompleted() {
79+
return
80+
}
81+
82+
time.Sleep(10 * time.Millisecond)
83+
}
84+
}
85+
86+
func (h *handler) subThreadsCompleted() bool {
87+
h.lock.Lock()
88+
defer h.lock.Unlock()
89+
90+
for id := range h.handlers {
91+
if !h.handlers[id].Completed() {
92+
return false
93+
}
94+
}
95+
96+
return true
97+
}
98+
99+
func (h *handler) Wait() error {
100+
<-h.completed
101+
102+
return h.err
103+
}
104+
105+
func (h *handler) Completed() bool {
106+
select {
107+
case <-h.completed:
108+
return true
109+
default:
110+
return false
111+
}
112+
}
113+
114+
func (h *handler) RunAsync(ctx context.Context, f RunFunc) Executor {
115+
h.lock.Lock()
116+
defer h.lock.Unlock()
117+
118+
n := &handler{
119+
th: h.th,
120+
completed: make(chan struct{}),
121+
log: h.log,
122+
}
123+
124+
h.handlers = append(h.handlers, n)
125+
126+
go n.run(ctx, f)
127+
128+
return n
129+
}
130+
131+
func (h *handler) run(ctx context.Context, entry RunFunc) {
132+
defer close(h.completed)
133+
134+
err := h.runE(ctx, entry)
135+
136+
subErrors := make([]error, len(h.handlers))
137+
138+
for id := range subErrors {
139+
subErrors[id] = h.handlers[id].Wait()
140+
}
141+
142+
subError := errors.Errors(subErrors...)
143+
144+
h.err = errors.Errors(err, subError)
145+
}
146+
147+
func (h *handler) runE(ctx context.Context, entry RunFunc) error {
148+
t := h.th.Acquire()
149+
defer t.Release()
150+
151+
log := h.log.Wrap(func(in *zerolog.Event) *zerolog.Event {
152+
return in.Int("thread", int(t.ID()))
153+
})
154+
155+
return entry(ctx, log, t, h)
156+
}

pkg/util/executor/executor_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
//
2+
// DISCLAIMER
3+
//
4+
// Copyright 2025 ArangoDB GmbH, Cologne, Germany
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
//
18+
// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
//
20+
21+
package executor
22+
23+
import (
24+
"context"
25+
"testing"
26+
27+
"github.com/stretchr/testify/require"
28+
29+
"github.com/arangodb/kube-arangodb/pkg/logging"
30+
)
31+
32+
func Test_Executor(t *testing.T) {
33+
ctx := context.Background()
34+
35+
logger := logging.Global().RegisterAndGetLogger("test", logging.Trace)
36+
37+
require.NoError(t, Run(ctx, logger, 1, func(ctx context.Context, log logging.Logger, th Thread, h Handler) error {
38+
log.Info("Start main thread")
39+
defer log.Info("Complete main thread")
40+
41+
h.RunAsync(ctx, func(ctx context.Context, log logging.Logger, th Thread, h Handler) error {
42+
log.Info("Start second thread")
43+
defer log.Info("Complete second thread")
44+
45+
h.RunAsync(ctx, func(ctx context.Context, log logging.Logger, th Thread, h Handler) error {
46+
log.Info("Start third thread")
47+
defer log.Info("Complete third thread")
48+
49+
return nil
50+
})
51+
52+
return nil
53+
})
54+
55+
h.WaitForSubThreads(th)
56+
57+
return nil
58+
}))
59+
}

pkg/util/executor/threader.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
//
2+
// DISCLAIMER
3+
//
4+
// Copyright 2025 ArangoDB GmbH, Cologne, Germany
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
//
18+
// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
//
20+
21+
package executor
22+
23+
import "sync"
24+
25+
func NewThreadManager(threads int) ThreadManager {
26+
r := make(chan ThreadID, threads)
27+
28+
for id := 0; id < threads; id++ {
29+
r <- ThreadID(id)
30+
}
31+
32+
return &threadManager{
33+
threads: r,
34+
}
35+
}
36+
37+
type ThreadID int
38+
39+
type ThreadManager interface {
40+
Acquire() Thread
41+
}
42+
43+
type threadManager struct {
44+
threads chan ThreadID
45+
}
46+
47+
func (t *threadManager) Acquire() Thread {
48+
id := <-t.threads
49+
50+
return &thread{
51+
parent: t,
52+
id: id,
53+
}
54+
}
55+
56+
type Thread interface {
57+
ID() ThreadID
58+
59+
Release()
60+
Wait()
61+
}
62+
63+
type thread struct {
64+
lock sync.Mutex
65+
66+
parent *threadManager
67+
68+
released bool
69+
70+
id ThreadID
71+
}
72+
73+
func (t *thread) ID() ThreadID {
74+
return t.id
75+
}
76+
77+
func (t *thread) Release() {
78+
t.lock.Lock()
79+
defer t.lock.Unlock()
80+
81+
if t.released {
82+
return
83+
}
84+
85+
t.released = true
86+
87+
t.parent.threads <- t.id
88+
}
89+
90+
func (t *thread) Wait() {
91+
t.lock.Lock()
92+
defer t.lock.Unlock()
93+
94+
t.parent.threads <- t.id
95+
96+
t.id = <-t.parent.threads
97+
}

0 commit comments

Comments
 (0)