diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index f02a36557..da98396c7 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -432,6 +432,7 @@ func (r *Runner) registerInTreePlugins() { plugins.Register(profile.SingleProfileHandlerType, profile.SingleProfileHandlerFactory) plugins.Register(scorer.KvCacheUtilizationScorerType, scorer.KvCacheUtilizationScorerFactory) plugins.Register(scorer.QueueScorerType, scorer.QueueScorerFactory) + plugins.Register(scorer.RunningQueueSizeScorerType, scorer.RunningQueueSizeScorerFactory) plugins.Register(scorer.LoraAffinityScorerType, scorer.LoraAffinityScorerFactory) // Latency predictor plugins plugins.Register(slo_aware_router.SLOAwareRouterPluginType, slo_aware_router.SLOAwareRouterFactory) diff --git a/pkg/epp/metrics/metrics.go b/pkg/epp/metrics/metrics.go index 44c3be87d..0b41be647 100644 --- a/pkg/epp/metrics/metrics.go +++ b/pkg/epp/metrics/metrics.go @@ -37,6 +37,7 @@ const ( KVCacheUsagePercentKey = "KVCacheUsagePercent" WaitingQueueSizeKey = "WaitingQueueSize" + RunningQueueSizeKey = "RunningQueueSize" MaxActiveModelsKey = "MaxActiveModels" ActiveModelsKey = "ActiveModels" WaitingModelsKey = "WaitingModels" diff --git a/pkg/epp/scheduling/framework/plugins/scorer/running.go b/pkg/epp/scheduling/framework/plugins/scorer/running.go new file mode 100644 index 000000000..5b586f8e8 --- /dev/null +++ b/pkg/epp/scheduling/framework/plugins/scorer/running.go @@ -0,0 +1,104 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scorer + +import ( + "context" + "encoding/json" + "math" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" +) + +const ( + RunningQueueSizeScorerType = "running-queue-size-scorer" +) + +// compile-time type assertion +var _ framework.Scorer = &RunningQueueSizeScorer{} + +// RunningQueueSizeScorerFactory defines the factory function for RunningQueueSizeScorer. +func RunningQueueSizeScorerFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) { + return NewRunningQueueSizeScorer().WithName(name), nil +} + +// NewRunningQueueSizeScorer initializes a new RunningQueueSizeScorer and returns its pointer. +func NewRunningQueueSizeScorer() *RunningQueueSizeScorer { + return &RunningQueueSizeScorer{ + typedName: plugins.TypedName{Type: RunningQueueSizeScorerType, Name: RunningQueueSizeScorerType}, + } +} + +// RunningQueueSizeScorer scores list of candidate pods based on the pod's running request size. +// the less running request size the pod has, the higher score it will get (since it's more available to serve new request). +type RunningQueueSizeScorer struct { + typedName plugins.TypedName +} + +// TypedName returns the type and name tuple of this plugin instance. +func (s *RunningQueueSizeScorer) TypedName() plugins.TypedName { + return s.typedName +} + +// Consumes returns the list of data that is consumed by the plugin. +func (s *RunningQueueSizeScorer) Consumes() map[string]any { + return map[string]any{ + metrics.RunningQueueSizeKey: int(0), + } +} + +// WithName sets the name of the scorer. +func (s *RunningQueueSizeScorer) WithName(name string) *RunningQueueSizeScorer { + s.typedName.Name = name + return s +} + +// Score returns the scoring result for the given list of pods based on context. +func (s *RunningQueueSizeScorer) Score(_ context.Context, _ *types.CycleState, _ *types.LLMRequest, pods []types.Pod) map[types.Pod]float64 { + minQueueSize := math.MaxInt + maxQueueSize := math.MinInt + + // Iterate through the remaining pods to find min and max + for _, pod := range pods { + queueSize := pod.GetMetrics().RunningQueueSize + if queueSize < minQueueSize { + minQueueSize = queueSize + } + if queueSize > maxQueueSize { + maxQueueSize = queueSize + } + } + + // podScoreFunc calculates the score based on the queue size of each pod. Longer queue gets a lower score. + podScoreFunc := func(pod types.Pod) float64 { + if maxQueueSize == minQueueSize { + // If all pods have the same queue size, return a neutral score + return 1.0 + } + return float64(maxQueueSize-pod.GetMetrics().RunningQueueSize) / float64(maxQueueSize-minQueueSize) + } + + // Create a map to hold the scores for each pod + scores := make(map[types.Pod]float64, len(pods)) + for _, pod := range pods { + scores[pod] = podScoreFunc(pod) + } + return scores +} diff --git a/pkg/epp/scheduling/framework/plugins/scorer/running_test.go b/pkg/epp/scheduling/framework/plugins/scorer/running_test.go new file mode 100644 index 000000000..653291631 --- /dev/null +++ b/pkg/epp/scheduling/framework/plugins/scorer/running_test.go @@ -0,0 +1,85 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scorer + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" + backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" +) + +func TestRunningQueueSizeScorer(t *testing.T) { + tests := []struct { + name string + pods []types.Pod + expectedScoresPod map[int]float64 // Map of pod index to expected score + }{ + { + name: "Different running queue sizes", + pods: []types.Pod{ + &types.PodMetrics{Pod: &backend.Pod{}, MetricsState: &backendmetrics.MetricsState{RunningQueueSize: 10}}, + &types.PodMetrics{Pod: &backend.Pod{}, MetricsState: &backendmetrics.MetricsState{RunningQueueSize: 5}}, + &types.PodMetrics{Pod: &backend.Pod{}, MetricsState: &backendmetrics.MetricsState{RunningQueueSize: 0}}, + }, + expectedScoresPod: map[int]float64{ + 0: 0.0, // Longest queue (10) gets lowest score + 1: 0.5, // Medium queue (5) gets medium score + 2: 1.0, // Shortest queue (0) gets highest score + }, + }, + { + name: "Same running queue sizes", + pods: []types.Pod{ + &types.PodMetrics{Pod: &backend.Pod{}, MetricsState: &backendmetrics.MetricsState{RunningQueueSize: 5}}, + &types.PodMetrics{Pod: &backend.Pod{}, MetricsState: &backendmetrics.MetricsState{RunningQueueSize: 5}}, + }, + expectedScoresPod: map[int]float64{ + 0: 1.0, // When all pods have the same queue size, they get the same neutral score + 1: 1.0, + }, + }, + { + name: "Zero running queue sizes", + pods: []types.Pod{ + &types.PodMetrics{Pod: &backend.Pod{}, MetricsState: &backendmetrics.MetricsState{RunningQueueSize: 0}}, + &types.PodMetrics{Pod: &backend.Pod{}, MetricsState: &backendmetrics.MetricsState{RunningQueueSize: 0}}, + }, + expectedScoresPod: map[int]float64{ + 0: 1.0, + 1: 1.0, + }, + }, + } + + scorer := &RunningQueueSizeScorer{} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + scores := scorer.Score(context.Background(), types.NewCycleState(), &types.LLMRequest{}, test.pods) + + for i, pod := range test.pods { + expectedScore := test.expectedScoresPod[i] + assert.InDelta(t, expectedScore, scores[pod], 0.0001, "Pod %d should have score %f", i, expectedScore) + } + }) + } +}