Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ func (r *Runner) registerInTreePlugins() {
plugins.Register(profile.SingleProfileHandlerType, profile.SingleProfileHandlerFactory)
plugins.Register(scorer.KvCacheUtilizationScorerType, scorer.KvCacheUtilizationScorerFactory)
plugins.Register(scorer.QueueScorerType, scorer.QueueScorerFactory)
plugins.Register(scorer.RunningQueueSizeScorerType, scorer.RunningQueueSizeScorerFactory)
plugins.Register(scorer.LoraAffinityScorerType, scorer.LoraAffinityScorerFactory)
// Latency predictor plugins
plugins.Register(slo_aware_router.SLOAwareRouterPluginType, slo_aware_router.SLOAwareRouterFactory)
Expand Down
1 change: 1 addition & 0 deletions pkg/epp/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (

KVCacheUsagePercentKey = "KVCacheUsagePercent"
WaitingQueueSizeKey = "WaitingQueueSize"
RunningQueueSizeKey = "RunningQueueSize"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

running requests are not a queue, no?

MaxActiveModelsKey = "MaxActiveModels"
ActiveModelsKey = "ActiveModels"
WaitingModelsKey = "WaitingModels"
Expand Down
104 changes: 104 additions & 0 deletions pkg/epp/scheduling/framework/plugins/scorer/running.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package scorer

import (
"context"
"encoding/json"
"math"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)

const (
RunningQueueSizeScorerType = "running-queue-size-scorer"
)

// compile-time type assertion
var _ framework.Scorer = &RunningQueueSizeScorer{}

// RunningQueueSizeScorerFactory defines the factory function for RunningQueueSizeScorer.
func RunningQueueSizeScorerFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return NewRunningQueueSizeScorer().WithName(name), nil
}

// NewRunningQueueSizeScorer initializes a new RunningQueueSizeScorer and returns its pointer.
func NewRunningQueueSizeScorer() *RunningQueueSizeScorer {
return &RunningQueueSizeScorer{
typedName: plugins.TypedName{Type: RunningQueueSizeScorerType, Name: RunningQueueSizeScorerType},
}
}

// RunningQueueSizeScorer scores list of candidate pods based on the pod's running request size.
// the less running request size the pod has, the higher score it will get (since it's more available to serve new request).
type RunningQueueSizeScorer struct {
typedName plugins.TypedName
}

// TypedName returns the type and name tuple of this plugin instance.
func (s *RunningQueueSizeScorer) TypedName() plugins.TypedName {
return s.typedName
}

// Consumes returns the list of data that is consumed by the plugin.
func (s *RunningQueueSizeScorer) Consumes() map[string]any {
return map[string]any{
metrics.RunningQueueSizeKey: int(0),
}
}

// WithName sets the name of the scorer.
func (s *RunningQueueSizeScorer) WithName(name string) *RunningQueueSizeScorer {
s.typedName.Name = name
return s
}

// Score returns the scoring result for the given list of pods based on context.
func (s *RunningQueueSizeScorer) Score(_ context.Context, _ *types.CycleState, _ *types.LLMRequest, pods []types.Pod) map[types.Pod]float64 {
minQueueSize := math.MaxInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix the names please, this is not queue, it is running

maxQueueSize := math.MinInt

// Iterate through the remaining pods to find min and max
for _, pod := range pods {
queueSize := pod.GetMetrics().RunningQueueSize
if queueSize < minQueueSize {
minQueueSize = queueSize
}
if queueSize > maxQueueSize {
maxQueueSize = queueSize
}
}

// podScoreFunc calculates the score based on the queue size of each pod. Longer queue gets a lower score.
podScoreFunc := func(pod types.Pod) float64 {
if maxQueueSize == minQueueSize {
// If all pods have the same queue size, return a neutral score
return 1.0
}
return float64(maxQueueSize-pod.GetMetrics().RunningQueueSize) / float64(maxQueueSize-minQueueSize)
}

// Create a map to hold the scores for each pod
scores := make(map[types.Pod]float64, len(pods))
for _, pod := range pods {
scores[pod] = podScoreFunc(pod)
}
return scores
}
85 changes: 85 additions & 0 deletions pkg/epp/scheduling/framework/plugins/scorer/running_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package scorer

import (
"context"
"testing"

"github.com/stretchr/testify/assert"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)

func TestRunningQueueSizeScorer(t *testing.T) {
tests := []struct {
name string
pods []types.Pod
expectedScoresPod map[int]float64 // Map of pod index to expected score
}{
{
name: "Different running queue sizes",
pods: []types.Pod{
&types.PodMetrics{Pod: &backend.Pod{}, MetricsState: &backendmetrics.MetricsState{RunningQueueSize: 10}},
&types.PodMetrics{Pod: &backend.Pod{}, MetricsState: &backendmetrics.MetricsState{RunningQueueSize: 5}},
&types.PodMetrics{Pod: &backend.Pod{}, MetricsState: &backendmetrics.MetricsState{RunningQueueSize: 0}},
},
expectedScoresPod: map[int]float64{
0: 0.0, // Longest queue (10) gets lowest score
1: 0.5, // Medium queue (5) gets medium score
2: 1.0, // Shortest queue (0) gets highest score
},
},
{
name: "Same running queue sizes",
pods: []types.Pod{
&types.PodMetrics{Pod: &backend.Pod{}, MetricsState: &backendmetrics.MetricsState{RunningQueueSize: 5}},
&types.PodMetrics{Pod: &backend.Pod{}, MetricsState: &backendmetrics.MetricsState{RunningQueueSize: 5}},
},
expectedScoresPod: map[int]float64{
0: 1.0, // When all pods have the same queue size, they get the same neutral score
1: 1.0,
},
},
{
name: "Zero running queue sizes",
pods: []types.Pod{
&types.PodMetrics{Pod: &backend.Pod{}, MetricsState: &backendmetrics.MetricsState{RunningQueueSize: 0}},
&types.PodMetrics{Pod: &backend.Pod{}, MetricsState: &backendmetrics.MetricsState{RunningQueueSize: 0}},
},
expectedScoresPod: map[int]float64{
0: 1.0,
1: 1.0,
},
},
}

scorer := &RunningQueueSizeScorer{}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
scores := scorer.Score(context.Background(), types.NewCycleState(), &types.LLMRequest{}, test.pods)

for i, pod := range test.pods {
expectedScore := test.expectedScoresPod[i]
assert.InDelta(t, expectedScore, scores[pod], 0.0001, "Pod %d should have score %f", i, expectedScore)
}
})
}
}