Skip to content

Commit 8ce2a04

Browse files
authored
Execute prepare data plugins in topological order of data dependencies (#1878)
* Parallelize execution of prepare data plugins as a DAG. Also detect data dependency cycles on startup. * Use buffered channel based approach to synchronize go routines. Also add unit tests * Fix runner after rebase * Cache DAG to avoid recomputation * Make plugin execution sequential in topologically sorted order * Extract DAG creation out, address review comments
1 parent f56f623 commit 8ce2a04

File tree

7 files changed

+453
-14
lines changed

7 files changed

+453
-14
lines changed

cmd/epp/runner/runner.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,10 @@ func (r *Runner) parseConfigurationPhaseTwo(ctx context.Context, rawConfig *conf
481481

482482
// Add requestControl plugins
483483
r.requestControlConfig.AddPlugins(handle.GetAllPlugins()...)
484+
// Sort prepare data plugins in DAG order (topological sort). Also check prepare data plugins for cycles.
485+
if r.requestControlConfig.PrepareDataPluginGraph() != nil {
486+
return nil, errors.New("failed to load the configuration - prepare data plugins have cyclic dependencies")
487+
}
484488

485489
// Handler deprecated configuration options
486490
r.deprecatedConfigurationHelper(cfg, logger)

pkg/epp/requestcontrol/dag.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package requestcontrol
18+
19+
import (
20+
"errors"
21+
"slices"
22+
)
23+
24+
// buildDAG builds a dependency graph among data preparation plugins based on their
25+
// produced and consumed data keys.
26+
func buildDAG(plugins []PrepareDataPlugin) map[string][]string {
27+
dag := make(map[string][]string)
28+
for _, plugin := range plugins {
29+
dag[plugin.TypedName().String()] = []string{}
30+
}
31+
// Create dependency graph as a DAG.
32+
for i := range plugins {
33+
for j := range plugins {
34+
if i == j {
35+
continue
36+
}
37+
// Check whether plugin[i] produces something consumed by plugin[j]. In that case, j depends on i.
38+
if plugins[i].Produces() != nil && plugins[j].Consumes() != nil {
39+
// For all the keys produced by plugin i, check if plugin j consumes any of them.
40+
// If yes, then j depends on i.
41+
for producedKey := range plugins[i].Produces() {
42+
// If plugin j consumes the produced key, then j depends on i. We can break after the first match.
43+
if _, ok := plugins[j].Consumes()[producedKey]; ok {
44+
iPluginName := plugins[i].TypedName().String()
45+
jPluginName := plugins[j].TypedName().String()
46+
dag[jPluginName] = append(dag[jPluginName], iPluginName)
47+
break
48+
}
49+
}
50+
}
51+
}
52+
}
53+
return dag
54+
}
55+
56+
// sortPlugins builds the dependency graph and returns the plugins ordered in topological order.
57+
// If there is a cycle, it returns an error.
58+
func sortPlugins(dag map[string][]string, plugins []PrepareDataPlugin) ([]PrepareDataPlugin, error) {
59+
nameToPlugin := map[string]PrepareDataPlugin{}
60+
for _, plugin := range plugins {
61+
nameToPlugin[plugin.TypedName().String()] = plugin
62+
}
63+
sortedPlugins, err := topologicalSort(dag)
64+
if err != nil {
65+
return nil, err
66+
}
67+
orderedPlugins := []PrepareDataPlugin{}
68+
for _, pluginName := range sortedPlugins {
69+
orderedPlugins = append(orderedPlugins, nameToPlugin[pluginName])
70+
}
71+
72+
return orderedPlugins, err
73+
}
74+
75+
// TopologicalSort performs Kahn's Algorithm on a DAG.
76+
// It returns the sorted order or an error if a cycle is detected.
77+
func topologicalSort(graph map[string][]string) ([]string, error) {
78+
// 1. Initialize in-degree map
79+
inDegree := make(map[string]int)
80+
81+
// Ensure all nodes are present in the inDegree map, even those with no dependencies
82+
for u, neighbors := range graph {
83+
if _, ok := inDegree[u]; !ok {
84+
inDegree[u] = 0
85+
}
86+
for _, v := range neighbors {
87+
inDegree[v]++ // Increment in-degree for the destination node
88+
}
89+
}
90+
91+
// 2. Initialize the queue with nodes having 0 in-degree
92+
var queue []string
93+
for node, degree := range inDegree {
94+
if degree == 0 {
95+
queue = append(queue, node)
96+
}
97+
}
98+
99+
var result []string
100+
101+
// 3. Process the queue
102+
for len(queue) > 0 {
103+
// Dequeue
104+
u := queue[0]
105+
queue = queue[1:]
106+
107+
result = append(result, u)
108+
109+
// Decrease in-degree of neighbors
110+
if neighbors, ok := graph[u]; ok {
111+
for _, v := range neighbors {
112+
inDegree[v]--
113+
if inDegree[v] == 0 {
114+
queue = append(queue, v)
115+
}
116+
}
117+
}
118+
}
119+
120+
// 4. Check for cycles
121+
// If the result size != total nodes, there is a cycle
122+
if len(result) != len(inDegree) {
123+
return nil, errors.New("cycle detected: graph is not a DAG")
124+
}
125+
slices.Reverse(result)
126+
return result, nil
127+
}

pkg/epp/requestcontrol/dag_test.go

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package requestcontrol
18+
19+
import (
20+
"context"
21+
"maps"
22+
"testing"
23+
24+
"github.com/google/go-cmp/cmp"
25+
"github.com/stretchr/testify/assert"
26+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
27+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
28+
)
29+
30+
type mockPrepareRequestDataP struct {
31+
name string
32+
produces map[string]any
33+
consumes map[string]any
34+
}
35+
36+
func (m *mockPrepareRequestDataP) TypedName() plugins.TypedName {
37+
return plugins.TypedName{Name: m.name, Type: "mock"}
38+
}
39+
40+
func (m *mockPrepareRequestDataP) Produces() map[string]any {
41+
return m.produces
42+
}
43+
44+
func (m *mockPrepareRequestDataP) Consumes() map[string]any {
45+
return m.consumes
46+
}
47+
48+
func (m *mockPrepareRequestDataP) PrepareRequestData(ctx context.Context, request *types.LLMRequest, pods []types.Pod) error {
49+
pods[0].Put(mockProducedDataKey, mockProducedDataType{value: 42})
50+
return nil
51+
}
52+
53+
func TestPrepareDataGraph(t *testing.T) {
54+
pluginA := &mockPrepareRequestDataP{name: "A", produces: map[string]any{"keyA": nil}}
55+
pluginB := &mockPrepareRequestDataP{name: "B", consumes: map[string]any{"keyA": nil}, produces: map[string]any{"keyB": nil}}
56+
pluginC := &mockPrepareRequestDataP{name: "C", consumes: map[string]any{"keyB": nil}}
57+
pluginD := &mockPrepareRequestDataP{name: "D", consumes: map[string]any{"keyA": nil}}
58+
pluginE := &mockPrepareRequestDataP{name: "E"} // No dependencies
59+
60+
// Cycle plugins
61+
pluginX := &mockPrepareRequestDataP{name: "X", produces: map[string]any{"keyX": nil}, consumes: map[string]any{"keyY": nil}}
62+
pluginY := &mockPrepareRequestDataP{name: "Y", produces: map[string]any{"keyY": nil}, consumes: map[string]any{"keyX": nil}}
63+
64+
testCases := []struct {
65+
name string
66+
plugins []PrepareDataPlugin
67+
expectedDAG map[string][]string
68+
expectError bool
69+
}{
70+
{
71+
name: "No plugins",
72+
plugins: []PrepareDataPlugin{},
73+
expectedDAG: map[string][]string{},
74+
expectError: false,
75+
},
76+
{
77+
name: "Plugins with no dependencies",
78+
plugins: []PrepareDataPlugin{pluginA, pluginE},
79+
expectedDAG: map[string][]string{
80+
"A/mock": {},
81+
"E/mock": {},
82+
},
83+
expectError: false,
84+
},
85+
{
86+
name: "Simple linear dependency (C -> B -> A)",
87+
plugins: []PrepareDataPlugin{pluginA, pluginB, pluginC},
88+
expectedDAG: map[string][]string{
89+
"A/mock": {},
90+
"B/mock": {"A/mock"},
91+
"C/mock": {"B/mock"},
92+
},
93+
expectError: false,
94+
},
95+
{
96+
name: "DAG with multiple dependencies (B -> A, D -> A, E independent)",
97+
plugins: []PrepareDataPlugin{pluginA, pluginB, pluginD, pluginE},
98+
expectedDAG: map[string][]string{
99+
"A/mock": {},
100+
"B/mock": {"A/mock"},
101+
"D/mock": {"A/mock"},
102+
"E/mock": {},
103+
},
104+
expectError: false,
105+
},
106+
{
107+
name: "Graph with a cycle (X -> Y, Y -> X)",
108+
plugins: []PrepareDataPlugin{pluginX, pluginY},
109+
expectedDAG: nil,
110+
expectError: true,
111+
},
112+
}
113+
114+
for _, tc := range testCases {
115+
t.Run(tc.name, func(t *testing.T) {
116+
dag := buildDAG(tc.plugins)
117+
orderedPlugins, err := sortPlugins(dag, tc.plugins)
118+
119+
if tc.expectError {
120+
assert.Error(t, err)
121+
assert.Contains(t, err.Error(), "cycle detected")
122+
return
123+
}
124+
assert.NoError(t, err)
125+
126+
// Normalize the slices in the maps for consistent comparison
127+
normalizedDAG := make(map[string][]string)
128+
maps.Copy(normalizedDAG, dag)
129+
normalizedExpectedDAG := make(map[string][]string)
130+
for k, v := range tc.expectedDAG {
131+
normalizedExpectedDAG[k] = v
132+
}
133+
134+
if diff := cmp.Diff(normalizedExpectedDAG, normalizedDAG); diff != "" {
135+
t.Errorf("prepareDataGraph() mismatch (-want +got):\n%s", diff)
136+
}
137+
138+
orderedPluginNames := make([]string, len(orderedPlugins))
139+
for i, p := range orderedPlugins {
140+
orderedPluginNames[i] = p.TypedName().String()
141+
}
142+
assertTopologicalOrder(t, dag, orderedPlugins)
143+
})
144+
}
145+
}
146+
147+
func assertTopologicalOrder(t *testing.T, dag map[string][]string, ordered []PrepareDataPlugin) {
148+
t.Helper()
149+
positions := make(map[string]int)
150+
for i, p := range ordered {
151+
positions[p.TypedName().String()] = i
152+
}
153+
154+
for node, dependencies := range dag {
155+
for _, dep := range dependencies {
156+
assert.Less(t, positions[dep], positions[node], "Dependency %s should come before %s", dep, node)
157+
}
158+
}
159+
}

pkg/epp/requestcontrol/director.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -351,13 +351,9 @@ func (d *Director) runPreRequestPlugins(ctx context.Context, request *scheduling
351351
}
352352
}
353353

354-
// TODO: Execute plugins in parallel once DAG execution is supported.
355-
// runPrepareDataPlugins executes PrepareDataPlugins sequentially.
356354
func (d *Director) runPrepareDataPlugins(ctx context.Context,
357355
request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) error {
358-
return prepareDataPluginsWithTimeout(
359-
prepareDataTimeout, d.requestControlPlugins.prepareDataPlugins, ctx, request, pods)
360-
356+
return prepareDataPluginsWithTimeout(prepareDataTimeout, d.requestControlPlugins.prepareDataPlugins, ctx, request, pods)
361357
}
362358

363359
func (d *Director) runAdmissionPlugins(ctx context.Context,

pkg/epp/requestcontrol/plugin_executor.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,20 +24,24 @@ import (
2424
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
2525
)
2626

27+
// executePluginsAsDAG executes PrepareData plugins as a DAG based on their dependencies asynchronously.
28+
// So, a plugin is executed only after all its dependencies have been executed.
29+
// If there is a cycle or any plugin fails with error, it returns an error.
30+
func executePluginsAsDAG(plugins []PrepareDataPlugin, ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) error {
31+
for _, plugin := range plugins {
32+
if err := plugin.PrepareRequestData(ctx, request, pods); err != nil {
33+
return errors.New("prepare data plugin " + plugin.TypedName().String() + " failed: " + err.Error())
34+
}
35+
}
36+
return nil
37+
}
38+
2739
// prepareDataPluginsWithTimeout executes the PrepareRequestData plugins with retries and timeout.
2840
func prepareDataPluginsWithTimeout(timeout time.Duration, plugins []PrepareDataPlugin,
2941
ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) error {
3042
errCh := make(chan error, 1)
31-
// Execute plugins sequentially in a separate goroutine
3243
go func() {
33-
for _, plugin := range plugins {
34-
err := plugin.PrepareRequestData(ctx, request, pods)
35-
if err != nil {
36-
errCh <- errors.New("prepare data plugin " + plugin.TypedName().String() + " failed: " + err.Error())
37-
return
38-
}
39-
}
40-
errCh <- nil
44+
errCh <- executePluginsAsDAG(plugins, ctx, request, pods)
4145
}()
4246

4347
select {

0 commit comments

Comments
 (0)